MongoDB  2.1.1-pre-
dbclient.h
Go to the documentation of this file.
00001 
00006 /*    Copyright 2009 10gen Inc.
00007  *
00008  *    Licensed under the Apache License, Version 2.0 (the "License");
00009  *    you may not use this file except in compliance with the License.
00010  *    You may obtain a copy of the License at
00011  *
00012  *    http://www.apache.org/licenses/LICENSE-2.0
00013  *
00014  *    Unless required by applicable law or agreed to in writing, software
00015  *    distributed under the License is distributed on an "AS IS" BASIS,
00016  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00017  *    See the License for the specific language governing permissions and
00018  *    limitations under the License.
00019  */
00020 
00021 #pragma once
00022 
00023 #include "../pch.h"
00024 #include "../util/net/message.h"
00025 #include "../util/net/message_port.h"
00026 #include "../db/jsobj.h"
00027 #include "../db/json.h"
00028 #include "../db/security.h"
00029 #include <stack>
00030 
00031 namespace mongo {
00032 
00034     enum QueryOptions {
00043         QueryOption_CursorTailable = 1 << 1,
00044 
00047         QueryOption_SlaveOk = 1 << 2,
00048 
00049         // findingStart mode is used to find the first operation of interest when
00050         // we are scanning through a repl log.  For efficiency in the common case,
00051         // where the first operation of interest is closer to the tail than the head,
00052         // we start from the tail of the log and work backwards until we find the
00053         // first operation of interest.  Then we scan forward from that first operation,
00054         // actually returning results to the client.  During the findingStart phase,
00055         // we release the db mutex occasionally to avoid blocking the db process for
00056         // an extended period of time.
00057         QueryOption_OplogReplay = 1 << 3,
00058 
00062         QueryOption_NoCursorTimeout = 1 << 4,
00063 
00067         QueryOption_AwaitData = 1 << 5,
00068 
00076         QueryOption_Exhaust = 1 << 6,
00077 
00082         QueryOption_PartialResults = 1 << 7 ,
00083 
00084         QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust | QueryOption_PartialResults
00085 
00086     };
00087 
00088     enum UpdateOptions {
00090         UpdateOption_Upsert = 1 << 0,
00091 
00094         UpdateOption_Multi = 1 << 1,
00095 
00097         UpdateOption_Broadcast = 1 << 2
00098     };
00099 
00100     enum RemoveOptions {
00102         RemoveOption_JustOne = 1 << 0,
00103 
00105         RemoveOption_Broadcast = 1 << 1
00106     };
00107 
00108     
00112     enum InsertOptions {
00114         InsertOption_ContinueOnError = 1 << 0
00115     };
00116 
00117     class DBClientBase;
00118 
00133     class ConnectionString {
00134     public:
00135         enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC };
00136 
00137         ConnectionString() {
00138             _type = INVALID;
00139         }
00140 
00141         ConnectionString( const HostAndPort& server ) {
00142             _type = MASTER;
00143             _servers.push_back( server );
00144             _finishInit();
00145         }
00146 
00147         ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) {
00148             _type = type;
00149             _setName = setName;
00150             _fillServers( s );
00151 
00152             switch ( _type ) {
00153             case MASTER:
00154                 assert( _servers.size() == 1 );
00155                 break;
00156             case SET:
00157                 assert( _setName.size() );
00158                 assert( _servers.size() >= 1 ); // 1 is ok since we can derive
00159                 break;
00160             case PAIR:
00161                 assert( _servers.size() == 2 );
00162                 break;
00163             default:
00164                 assert( _servers.size() > 0 );
00165             }
00166 
00167             _finishInit();
00168         }
00169 
00170         ConnectionString( const string& s , ConnectionType favoredMultipleType ) {
00171             _type = INVALID;
00172             
00173             _fillServers( s );
00174             if ( _type != INVALID ) {
00175                 // set already
00176             }
00177             else if ( _servers.size() == 1 ) {
00178                 _type = MASTER;
00179             }
00180             else {
00181                 _type = favoredMultipleType;
00182                 assert( _type == SET || _type == SYNC );
00183             }
00184             _finishInit();
00185         }
00186 
00187         bool isValid() const { return _type != INVALID; }
00188 
00189         string toString() const { return _string; }
00190         
00191         DBClientBase* connect( string& errmsg, double socketTimeout = 0 ) const;
00192 
00193         string getSetName() const { return _setName; }
00194 
00195         vector<HostAndPort> getServers() const { return _servers; }
00196         
00197         ConnectionType type() const { return _type; }
00198 
00199         static ConnectionString parse( const string& url , string& errmsg );
00200 
00201         static string typeToString( ConnectionType type );
00202 
00203     private:
00204 
00205         void _fillServers( string s );
00206         void _finishInit();
00207 
00208         ConnectionType _type;
00209         vector<HostAndPort> _servers;
00210         string _string;
00211         string _setName;
00212     };
00213 
00218     enum WriteConcern {
00219         W_NONE = 0 , // TODO: not every connection type fully supports this
00220         W_NORMAL = 1
00221         // TODO SAFE = 2
00222     };
00223 
00224     class BSONObj;
00225     class ScopedDbConnection;
00226     class DBClientCursor;
00227     class DBClientCursorBatchIterator;
00228 
00234     class Query {
00235     public:
00236         BSONObj obj;
00237         Query() : obj(BSONObj()) { }
00238         Query(const BSONObj& b) : obj(b) { }
00239         Query(const string &json) :
00240             obj(fromjson(json)) { }
00241         Query(const char * json) :
00242             obj(fromjson(json)) { }
00243 
00252         Query& sort(const BSONObj& sortPattern);
00253 
00259         Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; }
00260 
00266         Query& hint(BSONObj keyPattern);
00267         Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); }
00268 
00272         Query& minKey(const BSONObj &val);
00276         Query& maxKey(const BSONObj &val);
00277 
00281         Query& explain();
00282 
00291         Query& snapshot();
00292 
00309         Query& where(const string &jscode, BSONObj scope);
00310         Query& where(const string &jscode) { return where(jscode, BSONObj()); }
00311 
00315         bool isComplex( bool * hasDollar = 0 ) const;
00316 
00317         BSONObj getFilter() const;
00318         BSONObj getSort() const;
00319         BSONObj getHint() const;
00320         bool isExplain() const;
00321 
00322         string toString() const;
00323         operator string() const { return toString(); }
00324     private:
00325         void makeComplex();
00326         template< class T >
00327         void appendComplex( const char *fieldName, const T& val ) {
00328             makeComplex();
00329             BSONObjBuilder b;
00330             b.appendElements(obj);
00331             b.append(fieldName, val);
00332             obj = b.obj();
00333         }
00334     };
00335 
00340     class QuerySpec {
00341 
00342         string _ns;
00343         int _ntoskip;
00344         int _ntoreturn;
00345         int _options;
00346         BSONObj _query;
00347         BSONObj _fields;
00348         Query _queryObj;
00349 
00350     public:
00351         
00352         QuerySpec( const string& ns,
00353                    const BSONObj& query, const BSONObj& fields,
00354                    int ntoskip, int ntoreturn, int options )
00355             : _ns( ns ), _ntoskip( ntoskip ), _ntoreturn( ntoreturn ), _options( options ),
00356               _query( query.getOwned() ), _fields( fields.getOwned() ) , _queryObj( _query ) {
00357         }
00358 
00359         QuerySpec() {}
00360 
00361         bool isEmpty() const { return _ns.size() == 0; }
00362 
00363         bool isExplain() const { return _queryObj.isExplain(); }
00364         BSONObj filter() const { return _queryObj.getFilter(); }
00365 
00366         BSONObj hint() const { return _queryObj.getHint(); }
00367         BSONObj sort() const { return _queryObj.getSort(); }
00368         BSONObj query() const { return _query; }
00369         BSONObj fields() const { return _fields; }
00370         BSONObj* fieldsData() { return &_fields; }
00371 
00372         // don't love this, but needed downstrem
00373         const BSONObj* fieldsPtr() const { return &_fields; } 
00374 
00375         string ns() const { return _ns; }
00376         int ntoskip() const { return _ntoskip; }
00377         int ntoreturn() const { return _ntoreturn; }
00378         int options() const { return _options; }
00379         
00380         void setFields( BSONObj& o ) { _fields = o.getOwned(); }
00381 
00382         string toString() const {
00383             return str::stream() << "QSpec " << 
00384                 BSON( "ns" << _ns << "n2skip" << _ntoskip << "n2return" << _ntoreturn << "options" << _options
00385                       << "query" << _query << "fields" << _fields );
00386         }
00387         
00388     };
00389 
00390 
00394 #define QUERY(x) mongo::Query( BSON(x) )
00395 
00396     // Useful utilities for namespaces
00398     string nsGetDB( const string &ns );
00399 
00401     string nsGetCollection( const string &ns );
00402 
00406     class DBConnector {
00407     public:
00408         virtual ~DBConnector() {}
00410         virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0;
00411         virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 ) = 0;
00412         virtual void sayPiggyBack( Message &toSend ) = 0;
00413         /* used by QueryOption_Exhaust.  To use that your subclass must implement this. */
00414         virtual bool recv( Message& m ) { assert(false); return false; }
00415         // In general, for lazy queries, we'll need to say, recv, then checkResponse
00416         virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ) {
00417             if( retry ) *retry = false; if( targetHost ) *targetHost = "";
00418         }
00419         virtual bool lazySupported() const = 0;
00420     };
00421 
00425     class DBClientInterface : boost::noncopyable {
00426     public:
00427         virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
00428                                                const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0;
00429 
00430         virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0;
00431 
00432         virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0;
00433 
00434         virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0;
00435 
00436         virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0;
00437 
00438         virtual ~DBClientInterface() { }
00439 
00444         virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00445 
00449         void findN(vector<BSONObj>& out, const string&ns, Query query, int nToReturn, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00450 
00451         virtual string getServerAddress() const = 0;
00452 
00454         virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
00455     };
00456 
00461     class DBClientWithCommands : public DBClientInterface {
00462         set<string> _seenIndexes;
00463     public:
00465         int _logLevel;
00466 
00467         DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { }
00468 
00475         bool simpleCommand(const string &dbname, BSONObj *info, const string &command);
00476 
00488         virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
00489 
00499         virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true, Auth::Level * level = NULL);
00500 
00504         virtual unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0, int limit=0, int skip=0 );
00505 
00506         string createPasswordDigest( const string &username , const string &clearTextPassword );
00507 
00516         virtual bool isMaster(bool& isMaster, BSONObj *info=0);
00517 
00534         bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
00535 
00539         string getLastError(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0);
00540 
00547         virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0);
00548 
00552         static string getLastErrorString( const BSONObj& res );
00553 
00560         BSONObj getPrevError();
00561 
00566         bool resetError() { return simpleCommand("admin", 0, "reseterror"); }
00567 
00569         virtual bool dropCollection( const string &ns ) {
00570             string db = nsGetDB( ns );
00571             string coll = nsGetCollection( ns );
00572             uassert( 10011 ,  "no collection name", coll.size() );
00573 
00574             BSONObj info;
00575 
00576             bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info );
00577             resetIndexCache();
00578             return res;
00579         }
00580 
00584         bool repairDatabase(const string &dbname, BSONObj *info = 0) {
00585             return simpleCommand(dbname, info, "repairDatabase");
00586         }
00587 
00607         bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0);
00608 
00613         enum ProfilingLevel {
00614             ProfileOff = 0,
00615             ProfileSlow = 1, // log very slow (>100ms) operations
00616             ProfileAll = 2
00617 
00618         };
00619         bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0);
00620         bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0);
00621 
00622 
00626         struct MROutput {
00627             MROutput(const char* collection) : out(BSON("replace" << collection)) {}
00628             MROutput(const string& collection) : out(BSON("replace" << collection)) {}
00629             MROutput(const BSONObj& obj) : out(obj) {}
00630 
00631             BSONObj out;
00632         };
00633         static MROutput MRInline;
00634 
00658         BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), MROutput output = MRInline);
00659 
00675         bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0);
00676 
00680         bool validate( const string &ns , bool scandata=true ) {
00681             BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata );
00682             BSONObj info;
00683             return runCommand( nsGetDB( ns ).c_str() , cmd , info );
00684         }
00685 
00686         /* The following helpers are simply more convenient forms of eval() for certain common cases */
00687 
00688         /* invocation with no return value of interest -- with or without one simple parameter */
00689         bool eval(const string &dbname, const string &jscode);
00690         template< class T >
00691         bool eval(const string &dbname, const string &jscode, T parm1) {
00692             BSONObj info;
00693             BSONElement retValue;
00694             BSONObjBuilder b;
00695             b.append("0", parm1);
00696             BSONObj args = b.done();
00697             return eval(dbname, jscode, info, retValue, &args);
00698         }
00699 
00701         template< class T, class NumType >
00702         bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) {
00703             BSONObj info;
00704             BSONElement retValue;
00705             BSONObjBuilder b;
00706             b.append("0", parm1);
00707             BSONObj args = b.done();
00708             if ( !eval(dbname, jscode, info, retValue, &args) )
00709                 return false;
00710             ret = (NumType) retValue.number();
00711             return true;
00712         }
00713 
00719         list<string> getDatabaseNames();
00720 
00724         list<string> getCollectionNames( const string& db );
00725 
00726         bool exists( const string& ns );
00727 
00741         virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "",
00742                                   bool cache = true, bool background = false, int v = -1 );
00743 
00747         virtual void resetIndexCache();
00748 
00749         virtual auto_ptr<DBClientCursor> getIndexes( const string &ns );
00750 
00751         virtual void dropIndex( const string& ns , BSONObj keys );
00752         virtual void dropIndex( const string& ns , const string& indexName );
00753 
00757         virtual void dropIndexes( const string& ns );
00758 
00759         virtual void reIndex( const string& ns );
00760 
00761         string genIndexName( const BSONObj& keys );
00762 
00764         virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) {
00765             bool ret = simpleCommand(dbname, info, "dropDatabase");
00766             resetIndexCache();
00767             return ret;
00768         }
00769 
00770         virtual string toString() = 0;
00771 
00772     protected:
00774         bool isOk(const BSONObj&);
00775 
00777         bool isNotMasterErrorString( const BSONElement& e );
00778 
00779         BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip );
00780 
00781         enum QueryOptions availableOptions();
00782 
00783     private:
00784         enum QueryOptions _cachedAvailableOptions;
00785         bool _haveCachedAvailableOptions;
00786     };
00787 
00791     class DBClientBase : public DBClientWithCommands, public DBConnector {
00792     protected:
00793         WriteConcern _writeConcern;
00794 
00795     public:
00796         DBClientBase() {
00797             _writeConcern = W_NORMAL;
00798         }
00799 
00800         WriteConcern getWriteConcern() const { return _writeConcern; }
00801         void setWriteConcern( WriteConcern w ) { _writeConcern = w; }
00802 
00817         virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
00818                                                const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 );
00819 
00825         virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 );
00826 
00830         virtual void insert( const string &ns , BSONObj obj , int flags=0);
00831 
00835         virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);
00836 
00841         virtual void remove( const string &ns , Query q , bool justOne = 0 );
00842 
00846         virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false );
00847 
00848         virtual bool isFailed() const = 0;
00849 
00850         virtual void killCursor( long long cursorID ) = 0;
00851 
00852         virtual bool callRead( Message& toSend , Message& response ) = 0;
00853         // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed
00854         
00855         virtual ConnectionString::ConnectionType type() const = 0;
00856         
00857         virtual double getSoTimeout() const = 0;
00858 
00859     }; // DBClientBase
00860 
00861     class DBClientReplicaSet;
00862 
00863     class ConnectException : public UserException {
00864     public:
00865         ConnectException(string msg) : UserException(9000,msg) { }
00866     };
00867 
00872     class DBClientConnection : public DBClientBase {
00873     public:
00880         DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) :
00881             clientSet(cp), _failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) {
00882             _numConnections++;
00883         }
00884 
00885         virtual ~DBClientConnection() {
00886             _numConnections--;
00887         }
00888 
00900         virtual bool connect(const char * hostname, string& errmsg) {
00901             // TODO: remove this method
00902             HostAndPort t( hostname );
00903             return connect( t , errmsg );
00904         }
00905 
00915         virtual bool connect(const HostAndPort& server, string& errmsg);
00916 
00925         void connect(const string& serverHostname) {
00926             string errmsg;
00927             if( !connect(HostAndPort(serverHostname), errmsg) )
00928                 throw ConnectException(string("can't connect ") + errmsg);
00929         }
00930 
00931         virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true, Auth::Level* level=NULL);
00932 
00933         virtual auto_ptr<DBClientCursor> query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0,
00934                                                const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) {
00935             checkConnection();
00936             return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize );
00937         }
00938 
00945         unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00946         unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
00947 
00948         virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
00949 
00954         bool isFailed() const { return _failed; }
00955 
00956         MessagingPort& port() { assert(p); return *p; }
00957 
00958         string toStringLong() const {
00959             stringstream ss;
00960             ss << _serverString;
00961             if ( _failed ) ss << " failed";
00962             return ss.str();
00963         }
00964 
00966         string toString() { return _serverString; }
00967 
00968         string getServerAddress() const { return _serverString; }
00969 
00970         virtual void killCursor( long long cursorID );
00971         virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); }
00972         virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 );
00973         virtual bool recv( Message& m );
00974         virtual void checkResponse( const char *data, int nReturned, bool* retry = NULL, string* host = NULL );
00975         virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 );
00976         virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }
00977         void setSoTimeout(double to) { _so_timeout = to; }
00978         double getSoTimeout() const { return _so_timeout; }
00979 
00980         virtual bool lazySupported() const { return true; }
00981 
00982         static int getNumConnections() {
00983             return _numConnections;
00984         }
00985         
00986         static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; }
00987         static bool getLazyKillCursor() { return _lazyKillCursor; }
00988         
00989     protected:
00990         friend class SyncClusterConnection;
00991         virtual void sayPiggyBack( Message &toSend );
00992 
00993         DBClientReplicaSet *clientSet;
00994         boost::scoped_ptr<MessagingPort> p;
00995         boost::scoped_ptr<SockAddr> server;
00996         bool _failed;
00997         const bool autoReconnect;
00998         time_t lastReconnectTry;
00999         HostAndPort _server; // remember for reconnects
01000         string _serverString;
01001         void _checkConnection();
01002 
01003         // throws SocketException if in failed state and not reconnecting or if waiting to reconnect
01004         void checkConnection() { if( _failed ) _checkConnection(); }
01005 
01006         map< string, pair<string,string> > authCache;
01007         double _so_timeout;
01008         bool _connect( string& errmsg );
01009 
01010         static AtomicUInt _numConnections;
01011         static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op
01012 
01013 #ifdef MONGO_SSL
01014         static SSLManager* sslManager();
01015         static SSLManager* _sslManager;
01016 #endif
01017     };
01018 
01021     bool serverAlive( const string &uri );
01022 
01023     DBClientBase * createDirectClient();
01024 
01025     BSONElement getErrField( const BSONObj& result );
01026     bool hasErrField( const BSONObj& result );
01027 
01028     inline std::ostream& operator<<( std::ostream &s, const Query &q ) {
01029         return s << q.toString();
01030     }
01031 
01032 } // namespace mongo
01033 
01034 #include "dbclientcursor.h"
01035 #include "dbclient_rs.h"
01036 #include "undef_macros.h"