MongoDB  1.8.5
message.h
00001 // Message.h
00002 
00003 /*    Copyright 2009 10gen Inc.
00004  *
00005  *    Licensed under the Apache License, Version 2.0 (the "License");
00006  *    you may not use this file except in compliance with the License.
00007  *    You may obtain a copy of the License at
00008  *
00009  *    http://www.apache.org/licenses/LICENSE-2.0
00010  *
00011  *    Unless required by applicable law or agreed to in writing, software
00012  *    distributed under the License is distributed on an "AS IS" BASIS,
00013  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014  *    See the License for the specific language governing permissions and
00015  *    limitations under the License.
00016  */
00017 
00018 #pragma once
00019 
00020 #include "../util/sock.h"
00021 #include "../bson/util/atomic_int.h"
00022 #include "hostandport.h"
00023 
00024 namespace mongo {
00025 
00026     extern bool noUnixSocket;
00027 
00028     class Message;
00029     class MessagingPort;
00030     class PiggyBackData;
00031     typedef AtomicUInt MSGID;
00032 
00033     class Listener : boost::noncopyable {
00034     public:
00035         Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { }
00036         virtual ~Listener() {
00037             if ( _timeTracker == this )
00038                 _timeTracker = 0;
00039         }
00040         void initAndListen(); // never returns unless error (start a thread)
00041 
00042         /* spawn a thread, etc., then return */
00043         virtual void accepted(int sock, const SockAddr& from);
00044         virtual void accepted(MessagingPort *mp) {
00045             assert(!"You must overwrite one of the accepted methods");
00046         }
00047 
00048         const int _port;
00049 
00053         long long getMyElapsedTimeMillis() const { return _elapsedTime; }
00054 
00055         void setAsTimeTracker() {
00056             _timeTracker = this;
00057         }
00058 
00059         static const Listener* getTimeTracker() {
00060             return _timeTracker;
00061         }
00062 
00063         static long long getElapsedTimeMillis() {
00064             if ( _timeTracker )
00065                 return _timeTracker->getMyElapsedTimeMillis();
00066 
00067             // should this assert or throw?  seems like callers may not expect to get zero back, certainly not forever.
00068             return 0;
00069         }
00070 
00071     private:
00072         string _ip;
00073         bool _logConnect;
00074         long long _elapsedTime;
00075 
00076         static const Listener* _timeTracker;
00077     };
00078 
00079     class AbstractMessagingPort : boost::noncopyable {
00080     public:
00081         virtual ~AbstractMessagingPort() { }
00082         virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
00083         virtual void reply(Message& received, Message& response) = 0;
00084 
00085         virtual HostAndPort remote() const = 0;
00086         virtual unsigned remotePort() const = 0;
00087 
00088     private:
00089         int _clientId;
00090     };
00091 
00092     class MessagingPort : public AbstractMessagingPort {
00093     public:
00094         MessagingPort(int sock, const SockAddr& farEnd);
00095 
00096         // in some cases the timeout will actually be 2x this value - eg we do a partial send,
00097         // then the timeout fires, then we try to send again, then the timeout fires again with
00098         // no data sent, then we detect that the other side is down
00099         MessagingPort(double so_timeout = 0, int logLevel = 0 );
00100 
00101         virtual ~MessagingPort();
00102 
00103         void shutdown();
00104 
00105         bool connect(SockAddr& farEnd);
00106 
00107         /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
00108            also, the Message data will go out of scope on the subsequent recv call.
00109         */
00110         bool recv(Message& m);
00111         void reply(Message& received, Message& response, MSGID responseTo);
00112         void reply(Message& received, Message& response);
00113         bool call(Message& toSend, Message& response);
00114 
00115         void say(Message& toSend, int responseTo = -1);
00116 
00126         bool recv( const Message& sent , Message& response );
00127 
00128         void piggyBack( Message& toSend , int responseTo = -1 );
00129 
00130         virtual unsigned remotePort() const;
00131         virtual HostAndPort remote() const;
00132 
00133         // send len or throw SocketException
00134         void send( const char * data , int len, const char *context );
00135         void send( const vector< pair< char *, int > > &data, const char *context );
00136 
00137         // recv len or throw SocketException
00138         void recv( char * data , int len );
00139 
00140         int unsafe_recv( char *buf, int max );
00141 
00142         void clearCounters() { _bytesIn = 0; _bytesOut = 0; }
00143         long long getBytesIn() const { return _bytesIn; }
00144         long long getBytesOut() const { return _bytesOut; }
00145     private:
00146         int sock;
00147         PiggyBackData * piggyBackData;
00148 
00149         long long _bytesIn;
00150         long long _bytesOut;
00151         
00152         // this is the parsed version of farEnd
00153         // mutable because its initialized only on call to remote()
00154         mutable HostAndPort _farEndParsed; 
00155 
00156     public:
00157         SockAddr farEnd;
00158         double _timeout;
00159         int _logLevel; // passed to log() when logging errors
00160 
00161         static void closeAllSockets(unsigned tagMask = 0xffffffff);
00162 
00163         /* ports can be tagged with various classes.  see closeAllSockets(tag). defaults to 0. */
00164         unsigned tag;
00165 
00166         friend class PiggyBackData;
00167     };
00168 
00169     enum Operations {
00170         opReply = 1,     /* reply. responseTo is set. */
00171         dbMsg = 1000,    /* generic msg command followed by a string */
00172         dbUpdate = 2001, /* update object */
00173         dbInsert = 2002,
00174         //dbGetByOID = 2003,
00175         dbQuery = 2004,
00176         dbGetMore = 2005,
00177         dbDelete = 2006,
00178         dbKillCursors = 2007
00179     };
00180 
00181     bool doesOpGetAResponse( int op );
00182 
00183     inline const char * opToString( int op ) {
00184         switch ( op ) {
00185         case 0: return "none";
00186         case opReply: return "reply";
00187         case dbMsg: return "msg";
00188         case dbUpdate: return "update";
00189         case dbInsert: return "insert";
00190         case dbQuery: return "query";
00191         case dbGetMore: return "getmore";
00192         case dbDelete: return "remove";
00193         case dbKillCursors: return "killcursors";
00194         default:
00195             PRINT(op);
00196             assert(0);
00197             return "";
00198         }
00199     }
00200 
00201     inline bool opIsWrite( int op ) {
00202         switch ( op ) {
00203 
00204         case 0:
00205         case opReply:
00206         case dbMsg:
00207         case dbQuery:
00208         case dbGetMore:
00209         case dbKillCursors:
00210             return false;
00211 
00212         case dbUpdate:
00213         case dbInsert:
00214         case dbDelete:
00215             return false;
00216 
00217         default:
00218             PRINT(op);
00219             assert(0);
00220             return "";
00221         }
00222 
00223     }
00224 
00225 #pragma pack(1)
00226     /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
00227     */
00228     struct MSGHEADER {
00229         int messageLength; // total message size, including this
00230         int requestID;     // identifier for this message
00231         int responseTo;    // requestID from the original request
00232         //   (used in reponses from db)
00233         int opCode;
00234     };
00235     struct OP_GETMORE : public MSGHEADER {
00236         MSGHEADER header;             // standard message header
00237         int       ZERO_or_flags;      // 0 - reserved for future use
00238         //cstring   fullCollectionName; // "dbname.collectionname"
00239         //int32     numberToReturn;     // number of documents to return
00240         //int64     cursorID;           // cursorID from the OP_REPLY
00241     };
00242 #pragma pack()
00243 
00244 #pragma pack(1)
00245     /* todo merge this with MSGHEADER (or inherit from it). */
00246     struct MsgData {
00247         int len; /* len of the msg, including this field */
00248         MSGID id; /* request/reply id's match... */
00249         MSGID responseTo; /* id of the message we are responding to */
00250         short _operation;
00251         char _flags;
00252         char _version;
00253         int operation() const {
00254             return _operation;
00255         }
00256         void setOperation(int o) {
00257             _flags = 0;
00258             _version = 0;
00259             _operation = o;
00260         }
00261         char _data[4];
00262 
00263         int& dataAsInt() {
00264             return *((int *) _data);
00265         }
00266 
00267         bool valid() {
00268             if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) )
00269                 return false;
00270             if ( _operation < 0 || _operation > 30000 )
00271                 return false;
00272             return true;
00273         }
00274 
00275         long long getCursor() {
00276             assert( responseTo > 0 );
00277             assert( _operation == opReply );
00278             long long * l = (long long *)(_data + 4);
00279             return l[0];
00280         }
00281 
00282         int dataLen(); // len without header
00283     };
00284     const int MsgDataHeaderSize = sizeof(MsgData) - 4;
00285     inline int MsgData::dataLen() {
00286         return len - MsgDataHeaderSize;
00287     }
00288 #pragma pack()
00289 
00290     class Message {
00291     public:
00292         // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
00293         Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
00294         Message( void * data , bool freeIt ) :
00295             _buf( 0 ), _data( 0 ), _freeIt( false ) {
00296             _setData( reinterpret_cast< MsgData* >( data ), freeIt );
00297         };
00298         Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
00299             *this = r;
00300         }
00301         ~Message() {
00302             reset();
00303         }
00304 
00305         SockAddr _from;
00306 
00307         MsgData *header() const {
00308             assert( !empty() );
00309             return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first );
00310         }
00311         int operation() const { return header()->operation(); }
00312 
00313         MsgData *singleData() const {
00314             massert( 13273, "single data buffer expected", _buf );
00315             return header();
00316         }
00317 
00318         bool empty() const { return !_buf && _data.empty(); }
00319 
00320         int size() const {
00321             int res = 0;
00322             if ( _buf ) {
00323                 res =  _buf->len;
00324             }
00325             else {
00326                 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
00327                     res += it->second;
00328                 }
00329             }
00330             return res;
00331         }
00332 
00333         int dataSize() const { return size() - sizeof(MSGHEADER); }
00334 
00335         // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
00336         // can get rid of this if we make response handling smarter
00337         void concat() {
00338             if ( _buf || empty() ) {
00339                 return;
00340             }
00341 
00342             assert( _freeIt );
00343             int totalSize = 0;
00344             for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00345                 totalSize += i->second;
00346             }
00347             char *buf = (char*)malloc( totalSize );
00348             char *p = buf;
00349             for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00350                 memcpy( p, i->first, i->second );
00351                 p += i->second;
00352             }
00353             reset();
00354             _setData( (MsgData*)buf, true );
00355         }
00356 
00357         // vector swap() so this is fast
00358         Message& operator=(Message& r) {
00359             assert( empty() );
00360             assert( r._freeIt );
00361             _buf = r._buf;
00362             r._buf = 0;
00363             if ( r._data.size() > 0 ) {
00364                 _data.swap( r._data );
00365             }
00366             r._freeIt = false;
00367             _freeIt = true;
00368             return *this;
00369         }
00370 
00371         void reset() {
00372             if ( _freeIt ) {
00373                 if ( _buf ) {
00374                     free( _buf );
00375                 }
00376                 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00377                     free(i->first);
00378                 }
00379             }
00380             _buf = 0;
00381             _data.clear();
00382             _freeIt = false;
00383         }
00384 
00385         // use to add a buffer
00386         // assumes message will free everything
00387         void appendData(char *d, int size) {
00388             if ( size <= 0 ) {
00389                 return;
00390             }
00391             if ( empty() ) {
00392                 MsgData *md = (MsgData*)d;
00393                 md->len = size; // can be updated later if more buffers added
00394                 _setData( md, true );
00395                 return;
00396             }
00397             assert( _freeIt );
00398             if ( _buf ) {
00399                 _data.push_back( make_pair( (char*)_buf, _buf->len ) );
00400                 _buf = 0;
00401             }
00402             _data.push_back( make_pair( d, size ) );
00403             header()->len += size;
00404         }
00405 
00406         // use to set first buffer if empty
00407         void setData(MsgData *d, bool freeIt) {
00408             assert( empty() );
00409             _setData( d, freeIt );
00410         }
00411         void setData(int operation, const char *msgtxt) {
00412             setData(operation, msgtxt, strlen(msgtxt)+1);
00413         }
00414         void setData(int operation, const char *msgdata, size_t len) {
00415             assert( empty() );
00416             size_t dataLen = len + sizeof(MsgData) - 4;
00417             MsgData *d = (MsgData *) malloc(dataLen);
00418             memcpy(d->_data, msgdata, len);
00419             d->len = fixEndian(dataLen);
00420             d->setOperation(operation);
00421             _setData( d, true );
00422         }
00423 
00424         bool doIFreeIt() {
00425             return _freeIt;
00426         }
00427 
00428         void send( MessagingPort &p, const char *context ) {
00429             if ( empty() ) {
00430                 return;
00431             }
00432             if ( _buf != 0 ) {
00433                 p.send( (char*)_buf, _buf->len, context );
00434             }
00435             else {
00436                 p.send( _data, context );
00437             }
00438         }
00439 
00440     private:
00441         void _setData( MsgData *d, bool freeIt ) {
00442             _freeIt = freeIt;
00443             _buf = d;
00444         }
00445         // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
00446         MsgData * _buf;
00447         // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
00448         typedef vector< pair< char*, int > > MsgVec;
00449         MsgVec _data;
00450         bool _freeIt;
00451     };
00452 
00453     class SocketException : public DBException {
00454     public:
00455         const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
00456         
00457         SocketException( Type t , string server="" , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
00458         virtual ~SocketException() throw() {}
00459 
00460         bool shouldPrint() const { return _type != CLOSED; }
00461         virtual string toString() const;
00462 
00463     private:
00464         string _server;
00465         string _extra;
00466     };
00467 
00468     MSGID nextMessageId();
00469 
00470     extern TicketHolder connTicketHolder;
00471 
00472     class ElapsedTracker {
00473     public:
00474         ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
00475             : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) {
00476             _last = Listener::getElapsedTimeMillis();
00477         }
00478 
00483         bool ping() {
00484             if ( ( ++_pings % _h ) == 0 ) {
00485                 _last = Listener::getElapsedTimeMillis();
00486                 return true;
00487             }
00488 
00489             long long now = Listener::getElapsedTimeMillis();
00490             if ( now - _last > _ms ) {
00491                 _last = now;
00492                 return true;
00493             }
00494 
00495             return false;
00496         }
00497 
00498     private:
00499         int _h;
00500         int _ms;
00501 
00502         unsigned long long _pings;
00503 
00504         long long _last;
00505 
00506     };
00507 
00508 } // namespace mongo