|
MongoDB
2.1.1-pre-
|
00001 00006 /* Copyright 2009 10gen Inc. 00007 * 00008 * Licensed under the Apache License, Version 2.0 (the "License"); 00009 * you may not use this file except in compliance with the License. 00010 * You may obtain a copy of the License at 00011 * 00012 * http://www.apache.org/licenses/LICENSE-2.0 00013 * 00014 * Unless required by applicable law or agreed to in writing, software 00015 * distributed under the License is distributed on an "AS IS" BASIS, 00016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00017 * See the License for the specific language governing permissions and 00018 * limitations under the License. 00019 */ 00020 00021 #pragma once 00022 00023 #include "../pch.h" 00024 #include "../util/net/message.h" 00025 #include "../util/net/message_port.h" 00026 #include "../db/jsobj.h" 00027 #include "../db/json.h" 00028 #include "../db/security.h" 00029 #include <stack> 00030 00031 namespace mongo { 00032 00034 enum QueryOptions { 00043 QueryOption_CursorTailable = 1 << 1, 00044 00047 QueryOption_SlaveOk = 1 << 2, 00048 00049 // findingStart mode is used to find the first operation of interest when 00050 // we are scanning through a repl log. For efficiency in the common case, 00051 // where the first operation of interest is closer to the tail than the head, 00052 // we start from the tail of the log and work backwards until we find the 00053 // first operation of interest. Then we scan forward from that first operation, 00054 // actually returning results to the client. During the findingStart phase, 00055 // we release the db mutex occasionally to avoid blocking the db process for 00056 // an extended period of time. 00057 QueryOption_OplogReplay = 1 << 3, 00058 00062 QueryOption_NoCursorTimeout = 1 << 4, 00063 00067 QueryOption_AwaitData = 1 << 5, 00068 00076 QueryOption_Exhaust = 1 << 6, 00077 00082 QueryOption_PartialResults = 1 << 7 , 00083 00084 QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust | QueryOption_PartialResults 00085 00086 }; 00087 00088 enum UpdateOptions { 00090 UpdateOption_Upsert = 1 << 0, 00091 00094 UpdateOption_Multi = 1 << 1, 00095 00097 UpdateOption_Broadcast = 1 << 2 00098 }; 00099 00100 enum RemoveOptions { 00102 RemoveOption_JustOne = 1 << 0, 00103 00105 RemoveOption_Broadcast = 1 << 1 00106 }; 00107 00108 00112 enum InsertOptions { 00114 InsertOption_ContinueOnError = 1 << 0 00115 }; 00116 00117 class DBClientBase; 00118 00133 class ConnectionString { 00134 public: 00135 enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC }; 00136 00137 ConnectionString() { 00138 _type = INVALID; 00139 } 00140 00141 ConnectionString( const HostAndPort& server ) { 00142 _type = MASTER; 00143 _servers.push_back( server ); 00144 _finishInit(); 00145 } 00146 00147 ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) { 00148 _type = type; 00149 _setName = setName; 00150 _fillServers( s ); 00151 00152 switch ( _type ) { 00153 case MASTER: 00154 assert( _servers.size() == 1 ); 00155 break; 00156 case SET: 00157 assert( _setName.size() ); 00158 assert( _servers.size() >= 1 ); // 1 is ok since we can derive 00159 break; 00160 case PAIR: 00161 assert( _servers.size() == 2 ); 00162 break; 00163 default: 00164 assert( _servers.size() > 0 ); 00165 } 00166 00167 _finishInit(); 00168 } 00169 00170 ConnectionString( const string& s , ConnectionType favoredMultipleType ) { 00171 _type = INVALID; 00172 00173 _fillServers( s ); 00174 if ( _type != INVALID ) { 00175 // set already 00176 } 00177 else if ( _servers.size() == 1 ) { 00178 _type = MASTER; 00179 } 00180 else { 00181 _type = favoredMultipleType; 00182 assert( _type == SET || _type == SYNC ); 00183 } 00184 _finishInit(); 00185 } 00186 00187 bool isValid() const { return _type != INVALID; } 00188 00189 string toString() const { return _string; } 00190 00191 DBClientBase* connect( string& errmsg, double socketTimeout = 0 ) const; 00192 00193 string getSetName() const { return _setName; } 00194 00195 vector<HostAndPort> getServers() const { return _servers; } 00196 00197 ConnectionType type() const { return _type; } 00198 00199 static ConnectionString parse( const string& url , string& errmsg ); 00200 00201 static string typeToString( ConnectionType type ); 00202 00203 private: 00204 00205 void _fillServers( string s ); 00206 void _finishInit(); 00207 00208 ConnectionType _type; 00209 vector<HostAndPort> _servers; 00210 string _string; 00211 string _setName; 00212 }; 00213 00218 enum WriteConcern { 00219 W_NONE = 0 , // TODO: not every connection type fully supports this 00220 W_NORMAL = 1 00221 // TODO SAFE = 2 00222 }; 00223 00224 class BSONObj; 00225 class ScopedDbConnection; 00226 class DBClientCursor; 00227 class DBClientCursorBatchIterator; 00228 00234 class Query { 00235 public: 00236 BSONObj obj; 00237 Query() : obj(BSONObj()) { } 00238 Query(const BSONObj& b) : obj(b) { } 00239 Query(const string &json) : 00240 obj(fromjson(json)) { } 00241 Query(const char * json) : 00242 obj(fromjson(json)) { } 00243 00252 Query& sort(const BSONObj& sortPattern); 00253 00259 Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; } 00260 00266 Query& hint(BSONObj keyPattern); 00267 Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); } 00268 00272 Query& minKey(const BSONObj &val); 00276 Query& maxKey(const BSONObj &val); 00277 00281 Query& explain(); 00282 00291 Query& snapshot(); 00292 00309 Query& where(const string &jscode, BSONObj scope); 00310 Query& where(const string &jscode) { return where(jscode, BSONObj()); } 00311 00315 bool isComplex( bool * hasDollar = 0 ) const; 00316 00317 BSONObj getFilter() const; 00318 BSONObj getSort() const; 00319 BSONObj getHint() const; 00320 bool isExplain() const; 00321 00322 string toString() const; 00323 operator string() const { return toString(); } 00324 private: 00325 void makeComplex(); 00326 template< class T > 00327 void appendComplex( const char *fieldName, const T& val ) { 00328 makeComplex(); 00329 BSONObjBuilder b; 00330 b.appendElements(obj); 00331 b.append(fieldName, val); 00332 obj = b.obj(); 00333 } 00334 }; 00335 00340 class QuerySpec { 00341 00342 string _ns; 00343 int _ntoskip; 00344 int _ntoreturn; 00345 int _options; 00346 BSONObj _query; 00347 BSONObj _fields; 00348 Query _queryObj; 00349 00350 public: 00351 00352 QuerySpec( const string& ns, 00353 const BSONObj& query, const BSONObj& fields, 00354 int ntoskip, int ntoreturn, int options ) 00355 : _ns( ns ), _ntoskip( ntoskip ), _ntoreturn( ntoreturn ), _options( options ), 00356 _query( query.getOwned() ), _fields( fields.getOwned() ) , _queryObj( _query ) { 00357 } 00358 00359 QuerySpec() {} 00360 00361 bool isEmpty() const { return _ns.size() == 0; } 00362 00363 bool isExplain() const { return _queryObj.isExplain(); } 00364 BSONObj filter() const { return _queryObj.getFilter(); } 00365 00366 BSONObj hint() const { return _queryObj.getHint(); } 00367 BSONObj sort() const { return _queryObj.getSort(); } 00368 BSONObj query() const { return _query; } 00369 BSONObj fields() const { return _fields; } 00370 BSONObj* fieldsData() { return &_fields; } 00371 00372 // don't love this, but needed downstrem 00373 const BSONObj* fieldsPtr() const { return &_fields; } 00374 00375 string ns() const { return _ns; } 00376 int ntoskip() const { return _ntoskip; } 00377 int ntoreturn() const { return _ntoreturn; } 00378 int options() const { return _options; } 00379 00380 void setFields( BSONObj& o ) { _fields = o.getOwned(); } 00381 00382 string toString() const { 00383 return str::stream() << "QSpec " << 00384 BSON( "ns" << _ns << "n2skip" << _ntoskip << "n2return" << _ntoreturn << "options" << _options 00385 << "query" << _query << "fields" << _fields ); 00386 } 00387 00388 }; 00389 00390 00394 #define QUERY(x) mongo::Query( BSON(x) ) 00395 00396 // Useful utilities for namespaces 00398 string nsGetDB( const string &ns ); 00399 00401 string nsGetCollection( const string &ns ); 00402 00406 class DBConnector { 00407 public: 00408 virtual ~DBConnector() {} 00410 virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0; 00411 virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 ) = 0; 00412 virtual void sayPiggyBack( Message &toSend ) = 0; 00413 /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ 00414 virtual bool recv( Message& m ) { assert(false); return false; } 00415 // In general, for lazy queries, we'll need to say, recv, then checkResponse 00416 virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ) { 00417 if( retry ) *retry = false; if( targetHost ) *targetHost = ""; 00418 } 00419 virtual bool lazySupported() const = 0; 00420 }; 00421 00425 class DBClientInterface : boost::noncopyable { 00426 public: 00427 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, 00428 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0; 00429 00430 virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0; 00431 00432 virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0; 00433 00434 virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; 00435 00436 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0; 00437 00438 virtual ~DBClientInterface() { } 00439 00444 virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00445 00449 void findN(vector<BSONObj>& out, const string&ns, Query query, int nToReturn, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00450 00451 virtual string getServerAddress() const = 0; 00452 00454 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; 00455 }; 00456 00461 class DBClientWithCommands : public DBClientInterface { 00462 set<string> _seenIndexes; 00463 public: 00465 int _logLevel; 00466 00467 DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { } 00468 00475 bool simpleCommand(const string &dbname, BSONObj *info, const string &command); 00476 00488 virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); 00489 00499 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true, Auth::Level * level = NULL); 00500 00504 virtual unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0, int limit=0, int skip=0 ); 00505 00506 string createPasswordDigest( const string &username , const string &clearTextPassword ); 00507 00516 virtual bool isMaster(bool& isMaster, BSONObj *info=0); 00517 00534 bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); 00535 00539 string getLastError(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0); 00540 00547 virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0); 00548 00552 static string getLastErrorString( const BSONObj& res ); 00553 00560 BSONObj getPrevError(); 00561 00566 bool resetError() { return simpleCommand("admin", 0, "reseterror"); } 00567 00569 virtual bool dropCollection( const string &ns ) { 00570 string db = nsGetDB( ns ); 00571 string coll = nsGetCollection( ns ); 00572 uassert( 10011 , "no collection name", coll.size() ); 00573 00574 BSONObj info; 00575 00576 bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info ); 00577 resetIndexCache(); 00578 return res; 00579 } 00580 00584 bool repairDatabase(const string &dbname, BSONObj *info = 0) { 00585 return simpleCommand(dbname, info, "repairDatabase"); 00586 } 00587 00607 bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0); 00608 00613 enum ProfilingLevel { 00614 ProfileOff = 0, 00615 ProfileSlow = 1, // log very slow (>100ms) operations 00616 ProfileAll = 2 00617 00618 }; 00619 bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); 00620 bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); 00621 00622 00626 struct MROutput { 00627 MROutput(const char* collection) : out(BSON("replace" << collection)) {} 00628 MROutput(const string& collection) : out(BSON("replace" << collection)) {} 00629 MROutput(const BSONObj& obj) : out(obj) {} 00630 00631 BSONObj out; 00632 }; 00633 static MROutput MRInline; 00634 00658 BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), MROutput output = MRInline); 00659 00675 bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0); 00676 00680 bool validate( const string &ns , bool scandata=true ) { 00681 BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata ); 00682 BSONObj info; 00683 return runCommand( nsGetDB( ns ).c_str() , cmd , info ); 00684 } 00685 00686 /* The following helpers are simply more convenient forms of eval() for certain common cases */ 00687 00688 /* invocation with no return value of interest -- with or without one simple parameter */ 00689 bool eval(const string &dbname, const string &jscode); 00690 template< class T > 00691 bool eval(const string &dbname, const string &jscode, T parm1) { 00692 BSONObj info; 00693 BSONElement retValue; 00694 BSONObjBuilder b; 00695 b.append("0", parm1); 00696 BSONObj args = b.done(); 00697 return eval(dbname, jscode, info, retValue, &args); 00698 } 00699 00701 template< class T, class NumType > 00702 bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) { 00703 BSONObj info; 00704 BSONElement retValue; 00705 BSONObjBuilder b; 00706 b.append("0", parm1); 00707 BSONObj args = b.done(); 00708 if ( !eval(dbname, jscode, info, retValue, &args) ) 00709 return false; 00710 ret = (NumType) retValue.number(); 00711 return true; 00712 } 00713 00719 list<string> getDatabaseNames(); 00720 00724 list<string> getCollectionNames( const string& db ); 00725 00726 bool exists( const string& ns ); 00727 00741 virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", 00742 bool cache = true, bool background = false, int v = -1 ); 00743 00747 virtual void resetIndexCache(); 00748 00749 virtual auto_ptr<DBClientCursor> getIndexes( const string &ns ); 00750 00751 virtual void dropIndex( const string& ns , BSONObj keys ); 00752 virtual void dropIndex( const string& ns , const string& indexName ); 00753 00757 virtual void dropIndexes( const string& ns ); 00758 00759 virtual void reIndex( const string& ns ); 00760 00761 string genIndexName( const BSONObj& keys ); 00762 00764 virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) { 00765 bool ret = simpleCommand(dbname, info, "dropDatabase"); 00766 resetIndexCache(); 00767 return ret; 00768 } 00769 00770 virtual string toString() = 0; 00771 00772 protected: 00774 bool isOk(const BSONObj&); 00775 00777 bool isNotMasterErrorString( const BSONElement& e ); 00778 00779 BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip ); 00780 00781 enum QueryOptions availableOptions(); 00782 00783 private: 00784 enum QueryOptions _cachedAvailableOptions; 00785 bool _haveCachedAvailableOptions; 00786 }; 00787 00791 class DBClientBase : public DBClientWithCommands, public DBConnector { 00792 protected: 00793 WriteConcern _writeConcern; 00794 00795 public: 00796 DBClientBase() { 00797 _writeConcern = W_NORMAL; 00798 } 00799 00800 WriteConcern getWriteConcern() const { return _writeConcern; } 00801 void setWriteConcern( WriteConcern w ) { _writeConcern = w; } 00802 00817 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, 00818 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); 00819 00825 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ); 00826 00830 virtual void insert( const string &ns , BSONObj obj , int flags=0); 00831 00835 virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0); 00836 00841 virtual void remove( const string &ns , Query q , bool justOne = 0 ); 00842 00846 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false ); 00847 00848 virtual bool isFailed() const = 0; 00849 00850 virtual void killCursor( long long cursorID ) = 0; 00851 00852 virtual bool callRead( Message& toSend , Message& response ) = 0; 00853 // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed 00854 00855 virtual ConnectionString::ConnectionType type() const = 0; 00856 00857 virtual double getSoTimeout() const = 0; 00858 00859 }; // DBClientBase 00860 00861 class DBClientReplicaSet; 00862 00863 class ConnectException : public UserException { 00864 public: 00865 ConnectException(string msg) : UserException(9000,msg) { } 00866 }; 00867 00872 class DBClientConnection : public DBClientBase { 00873 public: 00880 DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) : 00881 clientSet(cp), _failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) { 00882 _numConnections++; 00883 } 00884 00885 virtual ~DBClientConnection() { 00886 _numConnections--; 00887 } 00888 00900 virtual bool connect(const char * hostname, string& errmsg) { 00901 // TODO: remove this method 00902 HostAndPort t( hostname ); 00903 return connect( t , errmsg ); 00904 } 00905 00915 virtual bool connect(const HostAndPort& server, string& errmsg); 00916 00925 void connect(const string& serverHostname) { 00926 string errmsg; 00927 if( !connect(HostAndPort(serverHostname), errmsg) ) 00928 throw ConnectException(string("can't connect ") + errmsg); 00929 } 00930 00931 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true, Auth::Level* level=NULL); 00932 00933 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0, 00934 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) { 00935 checkConnection(); 00936 return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize ); 00937 } 00938 00945 unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00946 unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00947 00948 virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); 00949 00954 bool isFailed() const { return _failed; } 00955 00956 MessagingPort& port() { assert(p); return *p; } 00957 00958 string toStringLong() const { 00959 stringstream ss; 00960 ss << _serverString; 00961 if ( _failed ) ss << " failed"; 00962 return ss.str(); 00963 } 00964 00966 string toString() { return _serverString; } 00967 00968 string getServerAddress() const { return _serverString; } 00969 00970 virtual void killCursor( long long cursorID ); 00971 virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); } 00972 virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 ); 00973 virtual bool recv( Message& m ); 00974 virtual void checkResponse( const char *data, int nReturned, bool* retry = NULL, string* host = NULL ); 00975 virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 ); 00976 virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } 00977 void setSoTimeout(double to) { _so_timeout = to; } 00978 double getSoTimeout() const { return _so_timeout; } 00979 00980 virtual bool lazySupported() const { return true; } 00981 00982 static int getNumConnections() { 00983 return _numConnections; 00984 } 00985 00986 static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; } 00987 static bool getLazyKillCursor() { return _lazyKillCursor; } 00988 00989 protected: 00990 friend class SyncClusterConnection; 00991 virtual void sayPiggyBack( Message &toSend ); 00992 00993 DBClientReplicaSet *clientSet; 00994 boost::scoped_ptr<MessagingPort> p; 00995 boost::scoped_ptr<SockAddr> server; 00996 bool _failed; 00997 const bool autoReconnect; 00998 time_t lastReconnectTry; 00999 HostAndPort _server; // remember for reconnects 01000 string _serverString; 01001 void _checkConnection(); 01002 01003 // throws SocketException if in failed state and not reconnecting or if waiting to reconnect 01004 void checkConnection() { if( _failed ) _checkConnection(); } 01005 01006 map< string, pair<string,string> > authCache; 01007 double _so_timeout; 01008 bool _connect( string& errmsg ); 01009 01010 static AtomicUInt _numConnections; 01011 static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op 01012 01013 #ifdef MONGO_SSL 01014 static SSLManager* sslManager(); 01015 static SSLManager* _sslManager; 01016 #endif 01017 }; 01018 01021 bool serverAlive( const string &uri ); 01022 01023 DBClientBase * createDirectClient(); 01024 01025 BSONElement getErrField( const BSONObj& result ); 01026 bool hasErrField( const BSONObj& result ); 01027 01028 inline std::ostream& operator<<( std::ostream &s, const Query &q ) { 01029 return s << q.toString(); 01030 } 01031 01032 } // namespace mongo 01033 01034 #include "dbclientcursor.h" 01035 #include "dbclient_rs.h" 01036 #include "undef_macros.h"
1.8.0