00001
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #pragma once
00019
00020 #include <stack>
00021 #include "dbclient.h"
00022 #include "redef_macros.h"
00023
00024 #include "../util/background.h"
00025
00026 namespace mongo {
00027
00028 class Shard;
00029 class DBConnectionPool;
00030
00035 class PoolForHost {
00036 public:
00037 PoolForHost()
00038 : _created(0) {}
00039
00040 PoolForHost( const PoolForHost& other ) {
00041 assert(other._pool.size() == 0);
00042 _created = other._created;
00043 assert( _created == 0 );
00044 }
00045
00046 ~PoolForHost();
00047
00048 int numAvailable() const { return (int)_pool.size(); }
00049
00050 void createdOne( DBClientBase * base );
00051 long long numCreated() const { return _created; }
00052
00053 ConnectionString::ConnectionType type() const { assert(_created); return _type; }
00054
00058 DBClientBase * get( DBConnectionPool * pool , double socketTimeout );
00059
00060 void done( DBConnectionPool * pool , DBClientBase * c );
00061
00062 void flush();
00063
00064 void getStaleConnections( vector<DBClientBase*>& stale );
00065
00066 static void setMaxPerHost( unsigned max ) { _maxPerHost = max; }
00067 static unsigned getMaxPerHost() { return _maxPerHost; }
00068 private:
00069
00070 struct StoredConnection {
00071 StoredConnection( DBClientBase * c );
00072
00073 bool ok( time_t now );
00074
00075 DBClientBase* conn;
00076 time_t when;
00077 };
00078
00079 std::stack<StoredConnection> _pool;
00080
00081 long long _created;
00082 ConnectionString::ConnectionType _type;
00083
00084 static unsigned _maxPerHost;
00085 };
00086
00087 class DBConnectionHook {
00088 public:
00089 virtual ~DBConnectionHook() {}
00090 virtual void onCreate( DBClientBase * conn ) {}
00091 virtual void onHandedOut( DBClientBase * conn ) {}
00092 virtual void onDestory( DBClientBase * conn ) {}
00093 };
00094
00110 class DBConnectionPool : public PeriodicTask {
00111
00112 public:
00113
00114 DBConnectionPool();
00115 ~DBConnectionPool();
00116
00118 void setName( const string& name ) { _name = name; }
00119
00120 void onCreate( DBClientBase * conn );
00121 void onHandedOut( DBClientBase * conn );
00122 void onDestory( DBClientBase * conn );
00123
00124 void flush();
00125
00126 DBClientBase *get(const string& host, double socketTimeout = 0);
00127 DBClientBase *get(const ConnectionString& host, double socketTimeout = 0);
00128
00129 void release(const string& host, DBClientBase *c);
00130
00131 void addHook( DBConnectionHook * hook );
00132 void appendInfo( BSONObjBuilder& b );
00133
00135 struct serverNameCompare {
00136 bool operator()( const string& a , const string& b ) const;
00137 };
00138
00139 virtual string taskName() const { return "DBConnectionPool-cleaner"; }
00140 virtual void taskDoWork();
00141
00142 private:
00143 DBConnectionPool( DBConnectionPool& p );
00144
00145 DBClientBase* _get( const string& ident , double socketTimeout );
00146
00147 DBClientBase* _finishCreate( const string& ident , double socketTimeout, DBClientBase* conn );
00148
00149 struct PoolKey {
00150 PoolKey( string i , double t ) : ident( i ) , timeout( t ) {}
00151 string ident;
00152 double timeout;
00153 };
00154
00155 struct poolKeyCompare {
00156 bool operator()( const PoolKey& a , const PoolKey& b ) const;
00157 };
00158
00159 typedef map<PoolKey,PoolForHost,poolKeyCompare> PoolMap;
00160
00161 mongo::mutex _mutex;
00162 string _name;
00163
00164 PoolMap _pools;
00165
00166
00167
00168 list<DBConnectionHook*> * _hooks;
00169
00170 };
00171
00172 extern DBConnectionPool pool;
00173
00174 class AScopedConnection : boost::noncopyable {
00175 public:
00176 AScopedConnection() { _numConnections++; }
00177 virtual ~AScopedConnection() { _numConnections--; }
00178
00179 virtual DBClientBase* get() = 0;
00180 virtual void done() = 0;
00181 virtual string getHost() const = 0;
00182
00186 virtual bool ok() const = 0;
00187
00191 static int getNumConnections() { return _numConnections; }
00192
00193 private:
00194 static AtomicUInt _numConnections;
00195 };
00196
00201 class ScopedDbConnection : public AScopedConnection {
00202 public:
00206 explicit ScopedDbConnection(const string& host, double socketTimeout = 0) : _host(host), _conn( pool.get(host, socketTimeout) ), _socketTimeout( socketTimeout ) {
00207 _setSocketTimeout();
00208 }
00209
00210 ScopedDbConnection() : _host( "" ) , _conn(0), _socketTimeout( 0 ) {}
00211
00212
00213 ScopedDbConnection(const string& host, DBClientBase* conn, double socketTimeout = 0 ) : _host( host ) , _conn( conn ), _socketTimeout( socketTimeout ) {
00214 _setSocketTimeout();
00215 }
00216
00218 explicit ScopedDbConnection(const ConnectionString& url, double socketTimeout = 0 ) : _host(url.toString()), _conn( pool.get(url, socketTimeout) ), _socketTimeout( socketTimeout ) {
00219 _setSocketTimeout();
00220 }
00221
00223 explicit ScopedDbConnection(const Shard& shard, double socketTimeout = 0 );
00224 explicit ScopedDbConnection(const Shard* shard, double socketTimeout = 0 );
00225
00226 ~ScopedDbConnection();
00227
00229 DBClientBase* operator->() {
00230 uassert( 11004 , "connection was returned to the pool already" , _conn );
00231 return _conn;
00232 }
00233
00235 DBClientBase& conn() {
00236 uassert( 11005 , "connection was returned to the pool already" , _conn );
00237 return *_conn;
00238 }
00239
00241 DBClientBase* get() {
00242 uassert( 13102 , "connection was returned to the pool already" , _conn );
00243 return _conn;
00244 }
00245
00246 bool ok() const { return _conn > 0; }
00247
00248 string getHost() const { return _host; }
00249
00253 void kill() {
00254 delete _conn;
00255 _conn = 0;
00256 }
00257
00264 void done() {
00265 if ( ! _conn )
00266 return;
00267
00268
00269
00270
00271
00272
00273 pool.release(_host, _conn);
00274 _conn = 0;
00275 }
00276
00277 ScopedDbConnection * steal();
00278
00279 private:
00280
00281 void _setSocketTimeout();
00282
00283 const string _host;
00284 DBClientBase *_conn;
00285 const double _socketTimeout;
00286
00287 };
00288
00289 }
00290
00291 #include "undef_macros.h"