MongoDB  1.8.5
parallel.h
00001 // parallel.h
00002 
00003 /*    Copyright 2009 10gen Inc.
00004  *
00005  *    Licensed under the Apache License, Version 2.0 (the "License");
00006  *    you may not use this file except in compliance with the License.
00007  *    You may obtain a copy of the License at
00008  *
00009  *    http://www.apache.org/licenses/LICENSE-2.0
00010  *
00011  *    Unless required by applicable law or agreed to in writing, software
00012  *    distributed under the License is distributed on an "AS IS" BASIS,
00013  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014  *    See the License for the specific language governing permissions and
00015  *    limitations under the License.
00016  */
00017 
00022 #include "../pch.h"
00023 #include "dbclient.h"
00024 #include "redef_macros.h"
00025 #include "../db/dbmessage.h"
00026 #include "../db/matcher.h"
00027 #include "../util/concurrency/mvar.h"
00028 
00029 namespace mongo {
00030 
00034     class ServerAndQuery {
00035     public:
00036         ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
00037             _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) {
00038         }
00039 
00040         bool operator<( const ServerAndQuery& other ) const {
00041             if ( ! _orderObject.isEmpty() )
00042                 return _orderObject.woCompare( other._orderObject ) < 0;
00043 
00044             if ( _server < other._server )
00045                 return true;
00046             if ( other._server > _server )
00047                 return false;
00048             return _extra.woCompare( other._extra ) < 0;
00049         }
00050 
00051         string toString() const {
00052             StringBuilder ss;
00053             ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
00054             return ss.str();
00055         }
00056 
00057         operator string() const {
00058             return toString();
00059         }
00060 
00061         string _server;
00062         BSONObj _extra;
00063         BSONObj _orderObject;
00064     };
00065 
00070     class ClusteredCursor {
00071     public:
00072         ClusteredCursor( QueryMessage& q );
00073         ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
00074         virtual ~ClusteredCursor();
00075 
00077         void init();
00078 
00079         virtual bool more() = 0;
00080         virtual BSONObj next() = 0;
00081 
00082         static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
00083 
00084         virtual string type() const = 0;
00085 
00086         virtual BSONObj explain();
00087 
00088     protected:
00089 
00090         virtual void _init() = 0;
00091 
00092         auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
00093         BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
00094 
00095         static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
00096 
00097         virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
00098 
00099         string _ns;
00100         BSONObj _query;
00101         int _options;
00102         BSONObj _fields;
00103         int _batchSize;
00104 
00105         bool _didInit;
00106 
00107         bool _done;
00108     };
00109 
00110 
00111     class FilteringClientCursor {
00112     public:
00113         FilteringClientCursor( const BSONObj filter = BSONObj() );
00114         FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
00115         ~FilteringClientCursor();
00116 
00117         void reset( auto_ptr<DBClientCursor> cursor );
00118 
00119         bool more();
00120         BSONObj next();
00121 
00122         BSONObj peek();
00123     private:
00124         void _advance();
00125 
00126         Matcher _matcher;
00127         auto_ptr<DBClientCursor> _cursor;
00128 
00129         BSONObj _next;
00130         bool _done;
00131     };
00132 
00133 
00134     class Servers {
00135     public:
00136         Servers() {
00137         }
00138 
00139         void add( const ServerAndQuery& s ) {
00140             add( s._server , s._extra );
00141         }
00142 
00143         void add( const string& server , const BSONObj& filter ) {
00144             vector<BSONObj>& mine = _filters[server];
00145             mine.push_back( filter.getOwned() );
00146         }
00147 
00148         // TOOO: pick a less horrible name
00149         class View {
00150             View( const Servers* s ) {
00151                 for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) {
00152                     _servers.push_back( i->first );
00153                     _filters.push_back( i->second );
00154                 }
00155             }
00156         public:
00157             int size() const {
00158                 return _servers.size();
00159             }
00160 
00161             string getServer( int n ) const {
00162                 return _servers[n];
00163             }
00164 
00165             vector<BSONObj> getFilter( int n ) const {
00166                 return _filters[ n ];
00167             }
00168 
00169         private:
00170             vector<string> _servers;
00171             vector< vector<BSONObj> > _filters;
00172 
00173             friend class Servers;
00174         };
00175 
00176         View view() const {
00177             return View( this );
00178         }
00179 
00180 
00181     private:
00182         map<string, vector<BSONObj> > _filters;
00183 
00184         friend class View;
00185     };
00186 
00187 
00192     class SerialServerClusteredCursor : public ClusteredCursor {
00193     public:
00194         SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder=0);
00195         virtual bool more();
00196         virtual BSONObj next();
00197         virtual string type() const { return "SerialServer"; }
00198 
00199     protected:
00200         virtual void _explain( map< string,list<BSONObj> >& out );
00201 
00202         void _init() {}
00203 
00204         vector<ServerAndQuery> _servers;
00205         unsigned _serverIndex;
00206 
00207         FilteringClientCursor _current;
00208 
00209         int _needToSkip;
00210     };
00211 
00212 
00217     class ParallelSortClusteredCursor : public ClusteredCursor {
00218     public:
00219         ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey );
00220         ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
00221                                      const Query& q , int options=0, const BSONObj& fields=BSONObj() );
00222         virtual ~ParallelSortClusteredCursor();
00223         virtual bool more();
00224         virtual BSONObj next();
00225         virtual string type() const { return "ParallelSort"; }
00226     protected:
00227         void _finishCons();
00228         void _init();
00229 
00230         virtual void _explain( map< string,list<BSONObj> >& out );
00231 
00232         int _numServers;
00233         set<ServerAndQuery> _servers;
00234         BSONObj _sortKey;
00235 
00236         FilteringClientCursor * _cursors;
00237         int _needToSkip;
00238     };
00239 
00245     class Future {
00246     public:
00247         class CommandResult {
00248         public:
00249 
00250             string getServer() const { return _server; }
00251 
00252             bool isDone() const { return _done; }
00253 
00254             bool ok() const {
00255                 assert( _done );
00256                 return _ok;
00257             }
00258 
00259             BSONObj result() const {
00260                 assert( _done );
00261                 return _res;
00262             }
00263 
00268             bool join();
00269 
00270         private:
00271 
00272             CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn );
00273 
00274             string _server;
00275             string _db;
00276             BSONObj _cmd;
00277             DBClientBase * _conn;
00278 
00279             scoped_ptr<boost::thread> _thr;
00280 
00281             BSONObj _res;
00282             bool _ok;
00283             bool _done;
00284 
00285             friend class Future;
00286         };
00287 
00288         static void commandThread(shared_ptr<CommandResult> res);
00289         
00296         static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 );
00297     };
00298 
00299 
00300 }
00301 
00302 #include "undef_macros.h"