|
MongoDB
1.8.5
|
00001 // parallel.h 00002 00003 /* Copyright 2009 10gen Inc. 00004 * 00005 * Licensed under the Apache License, Version 2.0 (the "License"); 00006 * you may not use this file except in compliance with the License. 00007 * You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00022 #include "../pch.h" 00023 #include "dbclient.h" 00024 #include "redef_macros.h" 00025 #include "../db/dbmessage.h" 00026 #include "../db/matcher.h" 00027 #include "../util/concurrency/mvar.h" 00028 00029 namespace mongo { 00030 00034 class ServerAndQuery { 00035 public: 00036 ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : 00037 _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) { 00038 } 00039 00040 bool operator<( const ServerAndQuery& other ) const { 00041 if ( ! _orderObject.isEmpty() ) 00042 return _orderObject.woCompare( other._orderObject ) < 0; 00043 00044 if ( _server < other._server ) 00045 return true; 00046 if ( other._server > _server ) 00047 return false; 00048 return _extra.woCompare( other._extra ) < 0; 00049 } 00050 00051 string toString() const { 00052 StringBuilder ss; 00053 ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString(); 00054 return ss.str(); 00055 } 00056 00057 operator string() const { 00058 return toString(); 00059 } 00060 00061 string _server; 00062 BSONObj _extra; 00063 BSONObj _orderObject; 00064 }; 00065 00070 class ClusteredCursor { 00071 public: 00072 ClusteredCursor( QueryMessage& q ); 00073 ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); 00074 virtual ~ClusteredCursor(); 00075 00077 void init(); 00078 00079 virtual bool more() = 0; 00080 virtual BSONObj next() = 0; 00081 00082 static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); 00083 00084 virtual string type() const = 0; 00085 00086 virtual BSONObj explain(); 00087 00088 protected: 00089 00090 virtual void _init() = 0; 00091 00092 auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); 00093 BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); 00094 00095 static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); 00096 00097 virtual void _explain( map< string,list<BSONObj> >& out ) = 0; 00098 00099 string _ns; 00100 BSONObj _query; 00101 int _options; 00102 BSONObj _fields; 00103 int _batchSize; 00104 00105 bool _didInit; 00106 00107 bool _done; 00108 }; 00109 00110 00111 class FilteringClientCursor { 00112 public: 00113 FilteringClientCursor( const BSONObj filter = BSONObj() ); 00114 FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); 00115 ~FilteringClientCursor(); 00116 00117 void reset( auto_ptr<DBClientCursor> cursor ); 00118 00119 bool more(); 00120 BSONObj next(); 00121 00122 BSONObj peek(); 00123 private: 00124 void _advance(); 00125 00126 Matcher _matcher; 00127 auto_ptr<DBClientCursor> _cursor; 00128 00129 BSONObj _next; 00130 bool _done; 00131 }; 00132 00133 00134 class Servers { 00135 public: 00136 Servers() { 00137 } 00138 00139 void add( const ServerAndQuery& s ) { 00140 add( s._server , s._extra ); 00141 } 00142 00143 void add( const string& server , const BSONObj& filter ) { 00144 vector<BSONObj>& mine = _filters[server]; 00145 mine.push_back( filter.getOwned() ); 00146 } 00147 00148 // TOOO: pick a less horrible name 00149 class View { 00150 View( const Servers* s ) { 00151 for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) { 00152 _servers.push_back( i->first ); 00153 _filters.push_back( i->second ); 00154 } 00155 } 00156 public: 00157 int size() const { 00158 return _servers.size(); 00159 } 00160 00161 string getServer( int n ) const { 00162 return _servers[n]; 00163 } 00164 00165 vector<BSONObj> getFilter( int n ) const { 00166 return _filters[ n ]; 00167 } 00168 00169 private: 00170 vector<string> _servers; 00171 vector< vector<BSONObj> > _filters; 00172 00173 friend class Servers; 00174 }; 00175 00176 View view() const { 00177 return View( this ); 00178 } 00179 00180 00181 private: 00182 map<string, vector<BSONObj> > _filters; 00183 00184 friend class View; 00185 }; 00186 00187 00192 class SerialServerClusteredCursor : public ClusteredCursor { 00193 public: 00194 SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder=0); 00195 virtual bool more(); 00196 virtual BSONObj next(); 00197 virtual string type() const { return "SerialServer"; } 00198 00199 protected: 00200 virtual void _explain( map< string,list<BSONObj> >& out ); 00201 00202 void _init() {} 00203 00204 vector<ServerAndQuery> _servers; 00205 unsigned _serverIndex; 00206 00207 FilteringClientCursor _current; 00208 00209 int _needToSkip; 00210 }; 00211 00212 00217 class ParallelSortClusteredCursor : public ClusteredCursor { 00218 public: 00219 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey ); 00220 ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , 00221 const Query& q , int options=0, const BSONObj& fields=BSONObj() ); 00222 virtual ~ParallelSortClusteredCursor(); 00223 virtual bool more(); 00224 virtual BSONObj next(); 00225 virtual string type() const { return "ParallelSort"; } 00226 protected: 00227 void _finishCons(); 00228 void _init(); 00229 00230 virtual void _explain( map< string,list<BSONObj> >& out ); 00231 00232 int _numServers; 00233 set<ServerAndQuery> _servers; 00234 BSONObj _sortKey; 00235 00236 FilteringClientCursor * _cursors; 00237 int _needToSkip; 00238 }; 00239 00245 class Future { 00246 public: 00247 class CommandResult { 00248 public: 00249 00250 string getServer() const { return _server; } 00251 00252 bool isDone() const { return _done; } 00253 00254 bool ok() const { 00255 assert( _done ); 00256 return _ok; 00257 } 00258 00259 BSONObj result() const { 00260 assert( _done ); 00261 return _res; 00262 } 00263 00268 bool join(); 00269 00270 private: 00271 00272 CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ); 00273 00274 string _server; 00275 string _db; 00276 BSONObj _cmd; 00277 DBClientBase * _conn; 00278 00279 scoped_ptr<boost::thread> _thr; 00280 00281 BSONObj _res; 00282 bool _ok; 00283 bool _done; 00284 00285 friend class Future; 00286 }; 00287 00288 static void commandThread(shared_ptr<CommandResult> res); 00289 00296 static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 ); 00297 }; 00298 00299 00300 } 00301 00302 #include "undef_macros.h"
1.7.5.1