|
MongoDB
2.1.1-pre-
|
00001 // message.h 00002 00003 /* Copyright 2009 10gen Inc. 00004 * 00005 * Licensed under the Apache License, Version 2.0 (the "License"); 00006 * you may not use this file except in compliance with the License. 00007 * You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #pragma once 00019 00020 #include "sock.h" 00021 #include "../../bson/util/atomic_int.h" 00022 #include "hostandport.h" 00023 00024 namespace mongo { 00025 00026 class Message; 00027 class MessagingPort; 00028 class PiggyBackData; 00029 00030 typedef AtomicUInt MSGID; 00031 00032 enum Operations { 00033 opReply = 1, /* reply. responseTo is set. */ 00034 dbMsg = 1000, /* generic msg command followed by a string */ 00035 dbUpdate = 2001, /* update object */ 00036 dbInsert = 2002, 00037 //dbGetByOID = 2003, 00038 dbQuery = 2004, 00039 dbGetMore = 2005, 00040 dbDelete = 2006, 00041 dbKillCursors = 2007 00042 }; 00043 00044 bool doesOpGetAResponse( int op ); 00045 00046 inline const char * opToString( int op ) { 00047 switch ( op ) { 00048 case 0: return "none"; 00049 case opReply: return "reply"; 00050 case dbMsg: return "msg"; 00051 case dbUpdate: return "update"; 00052 case dbInsert: return "insert"; 00053 case dbQuery: return "query"; 00054 case dbGetMore: return "getmore"; 00055 case dbDelete: return "remove"; 00056 case dbKillCursors: return "killcursors"; 00057 default: 00058 PRINT(op); 00059 assert(0); 00060 return ""; 00061 } 00062 } 00063 00064 inline bool opIsWrite( int op ) { 00065 switch ( op ) { 00066 00067 case 0: 00068 case opReply: 00069 case dbMsg: 00070 case dbQuery: 00071 case dbGetMore: 00072 case dbKillCursors: 00073 return false; 00074 00075 case dbUpdate: 00076 case dbInsert: 00077 case dbDelete: 00078 return true; 00079 00080 default: 00081 PRINT(op); 00082 assert(0); 00083 return ""; 00084 } 00085 00086 } 00087 00088 #pragma pack(1) 00089 /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol 00090 */ 00091 struct MSGHEADER { 00092 int messageLength; // total message size, including this 00093 int requestID; // identifier for this message 00094 int responseTo; // requestID from the original request 00095 // (used in reponses from db) 00096 int opCode; 00097 }; 00098 struct OP_GETMORE : public MSGHEADER { 00099 MSGHEADER header; // standard message header 00100 int ZERO_or_flags; // 0 - reserved for future use 00101 //cstring fullCollectionName; // "dbname.collectionname" 00102 //int32 numberToReturn; // number of documents to return 00103 //int64 cursorID; // cursorID from the OP_REPLY 00104 }; 00105 #pragma pack() 00106 00107 #pragma pack(1) 00108 /* todo merge this with MSGHEADER (or inherit from it). */ 00109 struct MsgData { 00110 int len; /* len of the msg, including this field */ 00111 MSGID id; /* request/reply id's match... */ 00112 MSGID responseTo; /* id of the message we are responding to */ 00113 short _operation; 00114 char _flags; 00115 char _version; 00116 int operation() const { 00117 return _operation; 00118 } 00119 void setOperation(int o) { 00120 _flags = 0; 00121 _version = 0; 00122 _operation = o; 00123 } 00124 char _data[4]; 00125 00126 int& dataAsInt() { 00127 return *((int *) _data); 00128 } 00129 00130 bool valid() { 00131 if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) ) 00132 return false; 00133 if ( _operation < 0 || _operation > 30000 ) 00134 return false; 00135 return true; 00136 } 00137 00138 long long getCursor() { 00139 assert( responseTo > 0 ); 00140 assert( _operation == opReply ); 00141 long long * l = (long long *)(_data + 4); 00142 return l[0]; 00143 } 00144 00145 int dataLen(); // len without header 00146 }; 00147 const int MsgDataHeaderSize = sizeof(MsgData) - 4; 00148 inline int MsgData::dataLen() { 00149 return len - MsgDataHeaderSize; 00150 } 00151 #pragma pack() 00152 00153 class Message { 00154 public: 00155 // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). 00156 Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {} 00157 Message( void * data , bool freeIt ) : 00158 _buf( 0 ), _data( 0 ), _freeIt( false ) { 00159 _setData( reinterpret_cast< MsgData* >( data ), freeIt ); 00160 }; 00161 Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) { 00162 *this = r; 00163 } 00164 ~Message() { 00165 reset(); 00166 } 00167 00168 SockAddr _from; 00169 00170 MsgData *header() const { 00171 assert( !empty() ); 00172 return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first ); 00173 } 00174 int operation() const { return header()->operation(); } 00175 00176 MsgData *singleData() const { 00177 massert( 13273, "single data buffer expected", _buf ); 00178 return header(); 00179 } 00180 00181 bool empty() const { return !_buf && _data.empty(); } 00182 00183 int size() const { 00184 int res = 0; 00185 if ( _buf ) { 00186 res = _buf->len; 00187 } 00188 else { 00189 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { 00190 res += it->second; 00191 } 00192 } 00193 return res; 00194 } 00195 00196 int dataSize() const { return size() - sizeof(MSGHEADER); } 00197 00198 // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy 00199 // can get rid of this if we make response handling smarter 00200 void concat() { 00201 if ( _buf || empty() ) { 00202 return; 00203 } 00204 00205 assert( _freeIt ); 00206 int totalSize = 0; 00207 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00208 totalSize += i->second; 00209 } 00210 char *buf = (char*)malloc( totalSize ); 00211 char *p = buf; 00212 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00213 memcpy( p, i->first, i->second ); 00214 p += i->second; 00215 } 00216 reset(); 00217 _setData( (MsgData*)buf, true ); 00218 } 00219 00220 // vector swap() so this is fast 00221 Message& operator=(Message& r) { 00222 assert( empty() ); 00223 assert( r._freeIt ); 00224 _buf = r._buf; 00225 r._buf = 0; 00226 if ( r._data.size() > 0 ) { 00227 _data.swap( r._data ); 00228 } 00229 r._freeIt = false; 00230 _freeIt = true; 00231 return *this; 00232 } 00233 00234 void reset() { 00235 if ( _freeIt ) { 00236 if ( _buf ) { 00237 free( _buf ); 00238 } 00239 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00240 free(i->first); 00241 } 00242 } 00243 _buf = 0; 00244 _data.clear(); 00245 _freeIt = false; 00246 } 00247 00248 // use to add a buffer 00249 // assumes message will free everything 00250 void appendData(char *d, int size) { 00251 if ( size <= 0 ) { 00252 return; 00253 } 00254 if ( empty() ) { 00255 MsgData *md = (MsgData*)d; 00256 md->len = size; // can be updated later if more buffers added 00257 _setData( md, true ); 00258 return; 00259 } 00260 assert( _freeIt ); 00261 if ( _buf ) { 00262 _data.push_back( make_pair( (char*)_buf, _buf->len ) ); 00263 _buf = 0; 00264 } 00265 _data.push_back( make_pair( d, size ) ); 00266 header()->len += size; 00267 } 00268 00269 // use to set first buffer if empty 00270 void setData(MsgData *d, bool freeIt) { 00271 assert( empty() ); 00272 _setData( d, freeIt ); 00273 } 00274 void setData(int operation, const char *msgtxt) { 00275 setData(operation, msgtxt, strlen(msgtxt)+1); 00276 } 00277 void setData(int operation, const char *msgdata, size_t len) { 00278 assert( empty() ); 00279 size_t dataLen = len + sizeof(MsgData) - 4; 00280 MsgData *d = (MsgData *) malloc(dataLen); 00281 memcpy(d->_data, msgdata, len); 00282 d->len = fixEndian(dataLen); 00283 d->setOperation(operation); 00284 _setData( d, true ); 00285 } 00286 00287 bool doIFreeIt() { 00288 return _freeIt; 00289 } 00290 00291 void send( MessagingPort &p, const char *context ); 00292 00293 string toString() const; 00294 00295 private: 00296 void _setData( MsgData *d, bool freeIt ) { 00297 _freeIt = freeIt; 00298 _buf = d; 00299 } 00300 // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data 00301 MsgData * _buf; 00302 // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead 00303 typedef vector< pair< char*, int > > MsgVec; 00304 MsgVec _data; 00305 bool _freeIt; 00306 }; 00307 00308 00309 MSGID nextMessageId(); 00310 00311 00312 } // namespace mongo
1.8.0