00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00022 #include "../pch.h"
00023 #include "dbclient.h"
00024 #include "redef_macros.h"
00025 #include "../db/dbmessage.h"
00026 #include "../db/matcher.h"
00027 #include "../util/concurrency/mvar.h"
00028
00029 namespace mongo {
00030
00034 class ServerAndQuery {
00035 public:
00036 ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
00037 _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) {
00038 }
00039
00040 bool operator<( const ServerAndQuery& other ) const {
00041 if ( ! _orderObject.isEmpty() )
00042 return _orderObject.woCompare( other._orderObject ) < 0;
00043
00044 if ( _server < other._server )
00045 return true;
00046 if ( other._server > _server )
00047 return false;
00048 return _extra.woCompare( other._extra ) < 0;
00049 }
00050
00051 string toString() const {
00052 StringBuilder ss;
00053 ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
00054 return ss.str();
00055 }
00056
00057 operator string() const {
00058 return toString();
00059 }
00060
00061 string _server;
00062 BSONObj _extra;
00063 BSONObj _orderObject;
00064 };
00065
00070 class ClusteredCursor {
00071 public:
00072 ClusteredCursor( QueryMessage& q );
00073 ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
00074 virtual ~ClusteredCursor();
00075
00077 void init();
00078
00079 virtual bool more() = 0;
00080 virtual BSONObj next() = 0;
00081
00082 static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
00083
00084 virtual string type() const = 0;
00085
00086 virtual BSONObj explain();
00087
00088 protected:
00089
00090 virtual void _init() = 0;
00091
00092 auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 , bool lazy=false );
00093 BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
00094
00099 void _checkCursor( DBClientCursor * cursor );
00100
00101 static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
00102
00103 virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
00104
00105 string _ns;
00106 BSONObj _query;
00107 int _options;
00108 BSONObj _fields;
00109 int _batchSize;
00110
00111 bool _didInit;
00112
00113 bool _done;
00114 };
00115
00116
00117 class FilteringClientCursor {
00118 public:
00119 FilteringClientCursor( const BSONObj filter = BSONObj() );
00120 FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter = BSONObj() );
00121 FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
00122 ~FilteringClientCursor();
00123
00124 void reset( auto_ptr<DBClientCursor> cursor );
00125 void reset( DBClientCursor* cursor );
00126
00127 bool more();
00128 BSONObj next();
00129
00130 BSONObj peek();
00131
00132 DBClientCursor* raw() { return _cursor.get(); }
00133
00134 private:
00135 void _advance();
00136
00137 Matcher _matcher;
00138 auto_ptr<DBClientCursor> _cursor;
00139
00140 BSONObj _next;
00141 bool _done;
00142 };
00143
00144
00145 class Servers {
00146 public:
00147 Servers() {
00148 }
00149
00150 void add( const ServerAndQuery& s ) {
00151 add( s._server , s._extra );
00152 }
00153
00154 void add( const string& server , const BSONObj& filter ) {
00155 vector<BSONObj>& mine = _filters[server];
00156 mine.push_back( filter.getOwned() );
00157 }
00158
00159
00160 class View {
00161 View( const Servers* s ) {
00162 for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) {
00163 _servers.push_back( i->first );
00164 _filters.push_back( i->second );
00165 }
00166 }
00167 public:
00168 int size() const {
00169 return _servers.size();
00170 }
00171
00172 string getServer( int n ) const {
00173 return _servers[n];
00174 }
00175
00176 vector<BSONObj> getFilter( int n ) const {
00177 return _filters[ n ];
00178 }
00179
00180 private:
00181 vector<string> _servers;
00182 vector< vector<BSONObj> > _filters;
00183
00184 friend class Servers;
00185 };
00186
00187 View view() const {
00188 return View( this );
00189 }
00190
00191
00192 private:
00193 map<string, vector<BSONObj> > _filters;
00194
00195 friend class View;
00196 };
00197
00198
00203 class SerialServerClusteredCursor : public ClusteredCursor {
00204 public:
00205 SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder=0);
00206 virtual bool more();
00207 virtual BSONObj next();
00208 virtual string type() const { return "SerialServer"; }
00209
00210 protected:
00211 virtual void _explain( map< string,list<BSONObj> >& out );
00212
00213 void _init() {}
00214
00215 vector<ServerAndQuery> _servers;
00216 unsigned _serverIndex;
00217
00218 FilteringClientCursor _current;
00219
00220 int _needToSkip;
00221 };
00222
00223
00228 class ParallelSortClusteredCursor : public ClusteredCursor {
00229 public:
00230 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey );
00231 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
00232 const Query& q , int options=0, const BSONObj& fields=BSONObj() );
00233 virtual ~ParallelSortClusteredCursor();
00234 virtual bool more();
00235 virtual BSONObj next();
00236 virtual string type() const { return "ParallelSort"; }
00237 protected:
00238 void _finishCons();
00239 void _init();
00240
00241 virtual void _explain( map< string,list<BSONObj> >& out );
00242
00243 int _numServers;
00244 set<ServerAndQuery> _servers;
00245 BSONObj _sortKey;
00246
00247 FilteringClientCursor * _cursors;
00248 int _needToSkip;
00249 };
00250
00256 class Future {
00257 public:
00258 class CommandResult {
00259 public:
00260
00261 string getServer() const { return _server; }
00262
00263 bool isDone() const { return _done; }
00264
00265 bool ok() const {
00266 assert( _done );
00267 return _ok;
00268 }
00269
00270 BSONObj result() const {
00271 assert( _done );
00272 return _res;
00273 }
00274
00279 bool join();
00280
00281 private:
00282
00283 CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn );
00284
00285 string _server;
00286 string _db;
00287 int _options;
00288 BSONObj _cmd;
00289 DBClientBase * _conn;
00290 scoped_ptr<ScopedDbConnection> _connHolder;
00291
00292 scoped_ptr<DBClientCursor> _cursor;
00293
00294 BSONObj _res;
00295 bool _ok;
00296 bool _done;
00297
00298 friend class Future;
00299 };
00300
00301
00308 static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn = 0 );
00309 };
00310
00311
00312 }
00313
00314 #include "undef_macros.h"