|
MongoDB
2.0.3
|
00001 00006 /* Copyright 2009 10gen Inc. 00007 * 00008 * Licensed under the Apache License, Version 2.0 (the "License"); 00009 * you may not use this file except in compliance with the License. 00010 * You may obtain a copy of the License at 00011 * 00012 * http://www.apache.org/licenses/LICENSE-2.0 00013 * 00014 * Unless required by applicable law or agreed to in writing, software 00015 * distributed under the License is distributed on an "AS IS" BASIS, 00016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00017 * See the License for the specific language governing permissions and 00018 * limitations under the License. 00019 */ 00020 00021 #pragma once 00022 00023 #include "../pch.h" 00024 #include "../util/net/message.h" 00025 #include "../util/net/message_port.h" 00026 #include "../db/jsobj.h" 00027 #include "../db/json.h" 00028 #include <stack> 00029 00030 namespace mongo { 00031 00033 enum QueryOptions { 00042 QueryOption_CursorTailable = 1 << 1, 00043 00046 QueryOption_SlaveOk = 1 << 2, 00047 00048 // findingStart mode is used to find the first operation of interest when 00049 // we are scanning through a repl log. For efficiency in the common case, 00050 // where the first operation of interest is closer to the tail than the head, 00051 // we start from the tail of the log and work backwards until we find the 00052 // first operation of interest. Then we scan forward from that first operation, 00053 // actually returning results to the client. During the findingStart phase, 00054 // we release the db mutex occasionally to avoid blocking the db process for 00055 // an extended period of time. 00056 QueryOption_OplogReplay = 1 << 3, 00057 00061 QueryOption_NoCursorTimeout = 1 << 4, 00062 00066 QueryOption_AwaitData = 1 << 5, 00067 00075 QueryOption_Exhaust = 1 << 6, 00076 00081 QueryOption_PartialResults = 1 << 7 , 00082 00083 QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust | QueryOption_PartialResults 00084 00085 }; 00086 00087 enum UpdateOptions { 00089 UpdateOption_Upsert = 1 << 0, 00090 00093 UpdateOption_Multi = 1 << 1, 00094 00096 UpdateOption_Broadcast = 1 << 2 00097 }; 00098 00099 enum RemoveOptions { 00101 RemoveOption_JustOne = 1 << 0, 00102 00104 RemoveOption_Broadcast = 1 << 1 00105 }; 00106 00107 00111 enum InsertOptions { 00113 InsertOption_ContinueOnError = 1 << 0 00114 }; 00115 00116 class DBClientBase; 00117 00132 class ConnectionString { 00133 public: 00134 enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC }; 00135 00136 ConnectionString() { 00137 _type = INVALID; 00138 } 00139 00140 ConnectionString( const HostAndPort& server ) { 00141 _type = MASTER; 00142 _servers.push_back( server ); 00143 _finishInit(); 00144 } 00145 00146 ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) { 00147 _type = type; 00148 _setName = setName; 00149 _fillServers( s ); 00150 00151 switch ( _type ) { 00152 case MASTER: 00153 assert( _servers.size() == 1 ); 00154 break; 00155 case SET: 00156 assert( _setName.size() ); 00157 assert( _servers.size() >= 1 ); // 1 is ok since we can derive 00158 break; 00159 case PAIR: 00160 assert( _servers.size() == 2 ); 00161 break; 00162 default: 00163 assert( _servers.size() > 0 ); 00164 } 00165 00166 _finishInit(); 00167 } 00168 00169 ConnectionString( const string& s , ConnectionType favoredMultipleType ) { 00170 _type = INVALID; 00171 00172 _fillServers( s ); 00173 if ( _type != INVALID ) { 00174 // set already 00175 } 00176 else if ( _servers.size() == 1 ) { 00177 _type = MASTER; 00178 } 00179 else { 00180 _type = favoredMultipleType; 00181 assert( _type == SET || _type == SYNC ); 00182 } 00183 _finishInit(); 00184 } 00185 00186 bool isValid() const { return _type != INVALID; } 00187 00188 string toString() const { return _string; } 00189 00190 DBClientBase* connect( string& errmsg, double socketTimeout = 0 ) const; 00191 00192 string getSetName() const { return _setName; } 00193 00194 vector<HostAndPort> getServers() const { return _servers; } 00195 00196 ConnectionType type() const { return _type; } 00197 00198 static ConnectionString parse( const string& url , string& errmsg ); 00199 00200 static string typeToString( ConnectionType type ); 00201 00202 private: 00203 00204 void _fillServers( string s ); 00205 void _finishInit(); 00206 00207 ConnectionType _type; 00208 vector<HostAndPort> _servers; 00209 string _string; 00210 string _setName; 00211 }; 00212 00217 enum WriteConcern { 00218 W_NONE = 0 , // TODO: not every connection type fully supports this 00219 W_NORMAL = 1 00220 // TODO SAFE = 2 00221 }; 00222 00223 class BSONObj; 00224 class ScopedDbConnection; 00225 class DBClientCursor; 00226 class DBClientCursorBatchIterator; 00227 00233 class Query { 00234 public: 00235 BSONObj obj; 00236 Query() : obj(BSONObj()) { } 00237 Query(const BSONObj& b) : obj(b) { } 00238 Query(const string &json) : 00239 obj(fromjson(json)) { } 00240 Query(const char * json) : 00241 obj(fromjson(json)) { } 00242 00251 Query& sort(const BSONObj& sortPattern); 00252 00258 Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; } 00259 00265 Query& hint(BSONObj keyPattern); 00266 Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); } 00267 00271 Query& minKey(const BSONObj &val); 00275 Query& maxKey(const BSONObj &val); 00276 00280 Query& explain(); 00281 00290 Query& snapshot(); 00291 00308 Query& where(const string &jscode, BSONObj scope); 00309 Query& where(const string &jscode) { return where(jscode, BSONObj()); } 00310 00314 bool isComplex( bool * hasDollar = 0 ) const; 00315 00316 BSONObj getFilter() const; 00317 BSONObj getSort() const; 00318 BSONObj getHint() const; 00319 bool isExplain() const; 00320 00321 string toString() const; 00322 operator string() const { return toString(); } 00323 private: 00324 void makeComplex(); 00325 template< class T > 00326 void appendComplex( const char *fieldName, const T& val ) { 00327 makeComplex(); 00328 BSONObjBuilder b; 00329 b.appendElements(obj); 00330 b.append(fieldName, val); 00331 obj = b.obj(); 00332 } 00333 }; 00334 00338 #define QUERY(x) mongo::Query( BSON(x) ) 00339 00343 class DBConnector { 00344 public: 00345 virtual ~DBConnector() {} 00347 virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0; 00348 virtual void say( Message &toSend, bool isRetry = false ) = 0; 00349 virtual void sayPiggyBack( Message &toSend ) = 0; 00350 /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ 00351 virtual bool recv( Message& m ) { assert(false); return false; } 00352 // In general, for lazy queries, we'll need to say, recv, then checkResponse 00353 virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ) { 00354 if( retry ) *retry = false; if( targetHost ) *targetHost = ""; 00355 } 00356 virtual bool lazySupported() const = 0; 00357 }; 00358 00362 class DBClientInterface : boost::noncopyable { 00363 public: 00364 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, 00365 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0; 00366 00367 virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0; 00368 00369 virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0; 00370 00371 virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; 00372 00373 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0; 00374 00375 virtual ~DBClientInterface() { } 00376 00381 virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00382 00386 void findN(vector<BSONObj>& out, const string&ns, Query query, int nToReturn, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00387 00388 virtual string getServerAddress() const = 0; 00389 00391 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; 00392 }; 00393 00398 class DBClientWithCommands : public DBClientInterface { 00399 set<string> _seenIndexes; 00400 public: 00402 int _logLevel; 00403 00404 DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { } 00405 00412 bool simpleCommand(const string &dbname, BSONObj *info, const string &command); 00413 00425 virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); 00426 00435 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); 00436 00440 virtual unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0, int limit=0, int skip=0 ); 00441 00442 string createPasswordDigest( const string &username , const string &clearTextPassword ); 00443 00452 virtual bool isMaster(bool& isMaster, BSONObj *info=0); 00453 00470 bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); 00471 00475 string getLastError(); 00476 00480 virtual BSONObj getLastErrorDetailed(); 00481 00485 static string getLastErrorString( const BSONObj& res ); 00486 00493 BSONObj getPrevError(); 00494 00499 bool resetError() { return simpleCommand("admin", 0, "reseterror"); } 00500 00502 virtual bool dropCollection( const string &ns ) { 00503 string db = nsGetDB( ns ); 00504 string coll = nsGetCollection( ns ); 00505 uassert( 10011 , "no collection name", coll.size() ); 00506 00507 BSONObj info; 00508 00509 bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info ); 00510 resetIndexCache(); 00511 return res; 00512 } 00513 00517 bool repairDatabase(const string &dbname, BSONObj *info = 0) { 00518 return simpleCommand(dbname, info, "repairDatabase"); 00519 } 00520 00540 bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0); 00541 00546 enum ProfilingLevel { 00547 ProfileOff = 0, 00548 ProfileSlow = 1, // log very slow (>100ms) operations 00549 ProfileAll = 2 00550 00551 }; 00552 bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); 00553 bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); 00554 00555 00559 struct MROutput { 00560 MROutput(const char* collection) : out(BSON("replace" << collection)) {} 00561 MROutput(const string& collection) : out(BSON("replace" << collection)) {} 00562 MROutput(const BSONObj& obj) : out(obj) {} 00563 00564 BSONObj out; 00565 }; 00566 static MROutput MRInline; 00567 00591 BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), MROutput output = MRInline); 00592 00608 bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0); 00609 00613 bool validate( const string &ns , bool scandata=true ) { 00614 BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata ); 00615 BSONObj info; 00616 return runCommand( nsGetDB( ns ).c_str() , cmd , info ); 00617 } 00618 00619 /* The following helpers are simply more convenient forms of eval() for certain common cases */ 00620 00621 /* invocation with no return value of interest -- with or without one simple parameter */ 00622 bool eval(const string &dbname, const string &jscode); 00623 template< class T > 00624 bool eval(const string &dbname, const string &jscode, T parm1) { 00625 BSONObj info; 00626 BSONElement retValue; 00627 BSONObjBuilder b; 00628 b.append("0", parm1); 00629 BSONObj args = b.done(); 00630 return eval(dbname, jscode, info, retValue, &args); 00631 } 00632 00634 template< class T, class NumType > 00635 bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) { 00636 BSONObj info; 00637 BSONElement retValue; 00638 BSONObjBuilder b; 00639 b.append("0", parm1); 00640 BSONObj args = b.done(); 00641 if ( !eval(dbname, jscode, info, retValue, &args) ) 00642 return false; 00643 ret = (NumType) retValue.number(); 00644 return true; 00645 } 00646 00652 list<string> getDatabaseNames(); 00653 00657 list<string> getCollectionNames( const string& db ); 00658 00659 bool exists( const string& ns ); 00660 00674 virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", 00675 bool cache = true, bool background = false, int v = -1 ); 00676 00680 virtual void resetIndexCache(); 00681 00682 virtual auto_ptr<DBClientCursor> getIndexes( const string &ns ); 00683 00684 virtual void dropIndex( const string& ns , BSONObj keys ); 00685 virtual void dropIndex( const string& ns , const string& indexName ); 00686 00690 virtual void dropIndexes( const string& ns ); 00691 00692 virtual void reIndex( const string& ns ); 00693 00694 string genIndexName( const BSONObj& keys ); 00695 00697 virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) { 00698 bool ret = simpleCommand(dbname, info, "dropDatabase"); 00699 resetIndexCache(); 00700 return ret; 00701 } 00702 00703 virtual string toString() = 0; 00704 00706 string nsGetDB( const string &ns ) { 00707 string::size_type pos = ns.find( "." ); 00708 if ( pos == string::npos ) 00709 return ns; 00710 00711 return ns.substr( 0 , pos ); 00712 } 00713 00715 string nsGetCollection( const string &ns ) { 00716 string::size_type pos = ns.find( "." ); 00717 if ( pos == string::npos ) 00718 return ""; 00719 00720 return ns.substr( pos + 1 ); 00721 } 00722 00723 protected: 00725 bool isOk(const BSONObj&); 00726 00728 bool isNotMasterErrorString( const BSONElement& e ); 00729 00730 BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip ); 00731 00732 enum QueryOptions availableOptions(); 00733 00734 private: 00735 enum QueryOptions _cachedAvailableOptions; 00736 bool _haveCachedAvailableOptions; 00737 }; 00738 00742 class DBClientBase : public DBClientWithCommands, public DBConnector { 00743 protected: 00744 WriteConcern _writeConcern; 00745 00746 public: 00747 DBClientBase() { 00748 _writeConcern = W_NORMAL; 00749 } 00750 00751 WriteConcern getWriteConcern() const { return _writeConcern; } 00752 void setWriteConcern( WriteConcern w ) { _writeConcern = w; } 00753 00768 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, 00769 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); 00770 00776 virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ); 00777 00781 virtual void insert( const string &ns , BSONObj obj , int flags=0); 00782 00786 virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0); 00787 00792 virtual void remove( const string &ns , Query q , bool justOne = 0 ); 00793 00797 virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false ); 00798 00799 virtual bool isFailed() const = 0; 00800 00801 virtual void killCursor( long long cursorID ) = 0; 00802 00803 virtual bool callRead( Message& toSend , Message& response ) = 0; 00804 // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed 00805 00806 virtual ConnectionString::ConnectionType type() const = 0; 00807 00808 virtual double getSoTimeout() const = 0; 00809 00810 }; // DBClientBase 00811 00812 class DBClientReplicaSet; 00813 00814 class ConnectException : public UserException { 00815 public: 00816 ConnectException(string msg) : UserException(9000,msg) { } 00817 }; 00818 00823 class DBClientConnection : public DBClientBase { 00824 public: 00831 DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) : 00832 clientSet(cp), _failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) { 00833 _numConnections++; 00834 } 00835 00836 virtual ~DBClientConnection() { 00837 _numConnections--; 00838 } 00839 00851 virtual bool connect(const char * hostname, string& errmsg) { 00852 // TODO: remove this method 00853 HostAndPort t( hostname ); 00854 return connect( t , errmsg ); 00855 } 00856 00866 virtual bool connect(const HostAndPort& server, string& errmsg); 00867 00876 void connect(const string& serverHostname) { 00877 string errmsg; 00878 if( !connect(HostAndPort(serverHostname), errmsg) ) 00879 throw ConnectException(string("can't connect ") + errmsg); 00880 } 00881 00882 virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); 00883 00884 virtual auto_ptr<DBClientCursor> query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0, 00885 const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) { 00886 checkConnection(); 00887 return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize ); 00888 } 00889 00896 unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00897 unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); 00898 00899 virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); 00900 00905 bool isFailed() const { return _failed; } 00906 00907 MessagingPort& port() { assert(p); return *p; } 00908 00909 string toStringLong() const { 00910 stringstream ss; 00911 ss << _serverString; 00912 if ( _failed ) ss << " failed"; 00913 return ss.str(); 00914 } 00915 00917 string toString() { return _serverString; } 00918 00919 string getServerAddress() const { return _serverString; } 00920 00921 virtual void killCursor( long long cursorID ); 00922 virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); } 00923 virtual void say( Message &toSend, bool isRetry = false ); 00924 virtual bool recv( Message& m ); 00925 virtual void checkResponse( const char *data, int nReturned, bool* retry = NULL, string* host = NULL ); 00926 virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 ); 00927 virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } 00928 void setSoTimeout(double to) { _so_timeout = to; } 00929 double getSoTimeout() const { return _so_timeout; } 00930 00931 virtual bool lazySupported() const { return true; } 00932 00933 static int getNumConnections() { 00934 return _numConnections; 00935 } 00936 00937 static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; } 00938 static bool getLazyKillCursor() { return _lazyKillCursor; } 00939 00940 protected: 00941 friend class SyncClusterConnection; 00942 virtual void sayPiggyBack( Message &toSend ); 00943 00944 DBClientReplicaSet *clientSet; 00945 boost::scoped_ptr<MessagingPort> p; 00946 boost::scoped_ptr<SockAddr> server; 00947 bool _failed; 00948 const bool autoReconnect; 00949 time_t lastReconnectTry; 00950 HostAndPort _server; // remember for reconnects 00951 string _serverString; 00952 void _checkConnection(); 00953 00954 // throws SocketException if in failed state and not reconnecting or if waiting to reconnect 00955 void checkConnection() { if( _failed ) _checkConnection(); } 00956 00957 map< string, pair<string,string> > authCache; 00958 double _so_timeout; 00959 bool _connect( string& errmsg ); 00960 00961 static AtomicUInt _numConnections; 00962 static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op 00963 00964 #ifdef MONGO_SSL 00965 static SSLManager* sslManager(); 00966 static SSLManager* _sslManager; 00967 #endif 00968 }; 00969 00972 bool serverAlive( const string &uri ); 00973 00974 DBClientBase * createDirectClient(); 00975 00976 BSONElement getErrField( const BSONObj& result ); 00977 bool hasErrField( const BSONObj& result ); 00978 00979 } // namespace mongo 00980 00981 #include "dbclientcursor.h" 00982 #include "dbclient_rs.h" 00983 #include "undef_macros.h"
1.8.0