MongoDB  2.1.1-pre-
message.h
00001 // message.h
00002 
00003 /*    Copyright 2009 10gen Inc.
00004  *
00005  *    Licensed under the Apache License, Version 2.0 (the "License");
00006  *    you may not use this file except in compliance with the License.
00007  *    You may obtain a copy of the License at
00008  *
00009  *    http://www.apache.org/licenses/LICENSE-2.0
00010  *
00011  *    Unless required by applicable law or agreed to in writing, software
00012  *    distributed under the License is distributed on an "AS IS" BASIS,
00013  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014  *    See the License for the specific language governing permissions and
00015  *    limitations under the License.
00016  */
00017 
00018 #pragma once
00019 
00020 #include "sock.h"
00021 #include "../../bson/util/atomic_int.h"
00022 #include "hostandport.h"
00023 
00024 namespace mongo {
00025 
00026     class Message;
00027     class MessagingPort;
00028     class PiggyBackData;
00029 
00030     typedef AtomicUInt MSGID;
00031 
00032     enum Operations {
00033         opReply = 1,     /* reply. responseTo is set. */
00034         dbMsg = 1000,    /* generic msg command followed by a string */
00035         dbUpdate = 2001, /* update object */
00036         dbInsert = 2002,
00037         //dbGetByOID = 2003,
00038         dbQuery = 2004,
00039         dbGetMore = 2005,
00040         dbDelete = 2006,
00041         dbKillCursors = 2007
00042     };
00043 
00044     bool doesOpGetAResponse( int op );
00045 
00046     inline const char * opToString( int op ) {
00047         switch ( op ) {
00048         case 0: return "none";
00049         case opReply: return "reply";
00050         case dbMsg: return "msg";
00051         case dbUpdate: return "update";
00052         case dbInsert: return "insert";
00053         case dbQuery: return "query";
00054         case dbGetMore: return "getmore";
00055         case dbDelete: return "remove";
00056         case dbKillCursors: return "killcursors";
00057         default:
00058             PRINT(op);
00059             assert(0);
00060             return "";
00061         }
00062     }
00063 
00064     inline bool opIsWrite( int op ) {
00065         switch ( op ) {
00066 
00067         case 0:
00068         case opReply:
00069         case dbMsg:
00070         case dbQuery:
00071         case dbGetMore:
00072         case dbKillCursors:
00073             return false;
00074 
00075         case dbUpdate:
00076         case dbInsert:
00077         case dbDelete:
00078             return true;
00079 
00080         default:
00081             PRINT(op);
00082             assert(0);
00083             return "";
00084         }
00085 
00086     }
00087 
00088 #pragma pack(1)
00089     /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
00090     */
00091     struct MSGHEADER {
00092         int messageLength; // total message size, including this
00093         int requestID;     // identifier for this message
00094         int responseTo;    // requestID from the original request
00095         //   (used in reponses from db)
00096         int opCode;
00097     };
00098     struct OP_GETMORE : public MSGHEADER {
00099         MSGHEADER header;             // standard message header
00100         int       ZERO_or_flags;      // 0 - reserved for future use
00101         //cstring   fullCollectionName; // "dbname.collectionname"
00102         //int32     numberToReturn;     // number of documents to return
00103         //int64     cursorID;           // cursorID from the OP_REPLY
00104     };
00105 #pragma pack()
00106 
00107 #pragma pack(1)
00108     /* todo merge this with MSGHEADER (or inherit from it). */
00109     struct MsgData {
00110         int len; /* len of the msg, including this field */
00111         MSGID id; /* request/reply id's match... */
00112         MSGID responseTo; /* id of the message we are responding to */
00113         short _operation;
00114         char _flags;
00115         char _version;
00116         int operation() const {
00117             return _operation;
00118         }
00119         void setOperation(int o) {
00120             _flags = 0;
00121             _version = 0;
00122             _operation = o;
00123         }
00124         char _data[4];
00125 
00126         int& dataAsInt() {
00127             return *((int *) _data);
00128         }
00129 
00130         bool valid() {
00131             if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) )
00132                 return false;
00133             if ( _operation < 0 || _operation > 30000 )
00134                 return false;
00135             return true;
00136         }
00137 
00138         long long getCursor() {
00139             assert( responseTo > 0 );
00140             assert( _operation == opReply );
00141             long long * l = (long long *)(_data + 4);
00142             return l[0];
00143         }
00144 
00145         int dataLen(); // len without header
00146     };
00147     const int MsgDataHeaderSize = sizeof(MsgData) - 4;
00148     inline int MsgData::dataLen() {
00149         return len - MsgDataHeaderSize;
00150     }
00151 #pragma pack()
00152 
00153     class Message {
00154     public:
00155         // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
00156         Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
00157         Message( void * data , bool freeIt ) :
00158             _buf( 0 ), _data( 0 ), _freeIt( false ) {
00159             _setData( reinterpret_cast< MsgData* >( data ), freeIt );
00160         };
00161         Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
00162             *this = r;
00163         }
00164         ~Message() {
00165             reset();
00166         }
00167 
00168         SockAddr _from;
00169 
00170         MsgData *header() const {
00171             assert( !empty() );
00172             return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first );
00173         }
00174         int operation() const { return header()->operation(); }
00175 
00176         MsgData *singleData() const {
00177             massert( 13273, "single data buffer expected", _buf );
00178             return header();
00179         }
00180 
00181         bool empty() const { return !_buf && _data.empty(); }
00182 
00183         int size() const {
00184             int res = 0;
00185             if ( _buf ) {
00186                 res =  _buf->len;
00187             }
00188             else {
00189                 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
00190                     res += it->second;
00191                 }
00192             }
00193             return res;
00194         }
00195 
00196         int dataSize() const { return size() - sizeof(MSGHEADER); }
00197 
00198         // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
00199         // can get rid of this if we make response handling smarter
00200         void concat() {
00201             if ( _buf || empty() ) {
00202                 return;
00203             }
00204 
00205             assert( _freeIt );
00206             int totalSize = 0;
00207             for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00208                 totalSize += i->second;
00209             }
00210             char *buf = (char*)malloc( totalSize );
00211             char *p = buf;
00212             for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00213                 memcpy( p, i->first, i->second );
00214                 p += i->second;
00215             }
00216             reset();
00217             _setData( (MsgData*)buf, true );
00218         }
00219 
00220         // vector swap() so this is fast
00221         Message& operator=(Message& r) {
00222             assert( empty() );
00223             assert( r._freeIt );
00224             _buf = r._buf;
00225             r._buf = 0;
00226             if ( r._data.size() > 0 ) {
00227                 _data.swap( r._data );
00228             }
00229             r._freeIt = false;
00230             _freeIt = true;
00231             return *this;
00232         }
00233 
00234         void reset() {
00235             if ( _freeIt ) {
00236                 if ( _buf ) {
00237                     free( _buf );
00238                 }
00239                 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
00240                     free(i->first);
00241                 }
00242             }
00243             _buf = 0;
00244             _data.clear();
00245             _freeIt = false;
00246         }
00247 
00248         // use to add a buffer
00249         // assumes message will free everything
00250         void appendData(char *d, int size) {
00251             if ( size <= 0 ) {
00252                 return;
00253             }
00254             if ( empty() ) {
00255                 MsgData *md = (MsgData*)d;
00256                 md->len = size; // can be updated later if more buffers added
00257                 _setData( md, true );
00258                 return;
00259             }
00260             assert( _freeIt );
00261             if ( _buf ) {
00262                 _data.push_back( make_pair( (char*)_buf, _buf->len ) );
00263                 _buf = 0;
00264             }
00265             _data.push_back( make_pair( d, size ) );
00266             header()->len += size;
00267         }
00268 
00269         // use to set first buffer if empty
00270         void setData(MsgData *d, bool freeIt) {
00271             assert( empty() );
00272             _setData( d, freeIt );
00273         }
00274         void setData(int operation, const char *msgtxt) {
00275             setData(operation, msgtxt, strlen(msgtxt)+1);
00276         }
00277         void setData(int operation, const char *msgdata, size_t len) {
00278             assert( empty() );
00279             size_t dataLen = len + sizeof(MsgData) - 4;
00280             MsgData *d = (MsgData *) malloc(dataLen);
00281             memcpy(d->_data, msgdata, len);
00282             d->len = fixEndian(dataLen);
00283             d->setOperation(operation);
00284             _setData( d, true );
00285         }
00286 
00287         bool doIFreeIt() {
00288             return _freeIt;
00289         }
00290 
00291         void send( MessagingPort &p, const char *context );
00292         
00293         string toString() const;
00294 
00295     private:
00296         void _setData( MsgData *d, bool freeIt ) {
00297             _freeIt = freeIt;
00298             _buf = d;
00299         }
00300         // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
00301         MsgData * _buf;
00302         // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
00303         typedef vector< pair< char*, int > > MsgVec;
00304         MsgVec _data;
00305         bool _freeIt;
00306     };
00307 
00308 
00309     MSGID nextMessageId();
00310 
00311 
00312 } // namespace mongo