|
MongoDB
1.8.5
|
00001 // Message.h 00002 00003 /* Copyright 2009 10gen Inc. 00004 * 00005 * Licensed under the Apache License, Version 2.0 (the "License"); 00006 * you may not use this file except in compliance with the License. 00007 * You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #pragma once 00019 00020 #include "../util/sock.h" 00021 #include "../bson/util/atomic_int.h" 00022 #include "hostandport.h" 00023 00024 namespace mongo { 00025 00026 extern bool noUnixSocket; 00027 00028 class Message; 00029 class MessagingPort; 00030 class PiggyBackData; 00031 typedef AtomicUInt MSGID; 00032 00033 class Listener : boost::noncopyable { 00034 public: 00035 Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { } 00036 virtual ~Listener() { 00037 if ( _timeTracker == this ) 00038 _timeTracker = 0; 00039 } 00040 void initAndListen(); // never returns unless error (start a thread) 00041 00042 /* spawn a thread, etc., then return */ 00043 virtual void accepted(int sock, const SockAddr& from); 00044 virtual void accepted(MessagingPort *mp) { 00045 assert(!"You must overwrite one of the accepted methods"); 00046 } 00047 00048 const int _port; 00049 00053 long long getMyElapsedTimeMillis() const { return _elapsedTime; } 00054 00055 void setAsTimeTracker() { 00056 _timeTracker = this; 00057 } 00058 00059 static const Listener* getTimeTracker() { 00060 return _timeTracker; 00061 } 00062 00063 static long long getElapsedTimeMillis() { 00064 if ( _timeTracker ) 00065 return _timeTracker->getMyElapsedTimeMillis(); 00066 00067 // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever. 00068 return 0; 00069 } 00070 00071 private: 00072 string _ip; 00073 bool _logConnect; 00074 long long _elapsedTime; 00075 00076 static const Listener* _timeTracker; 00077 }; 00078 00079 class AbstractMessagingPort : boost::noncopyable { 00080 public: 00081 virtual ~AbstractMessagingPort() { } 00082 virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available 00083 virtual void reply(Message& received, Message& response) = 0; 00084 00085 virtual HostAndPort remote() const = 0; 00086 virtual unsigned remotePort() const = 0; 00087 00088 private: 00089 int _clientId; 00090 }; 00091 00092 class MessagingPort : public AbstractMessagingPort { 00093 public: 00094 MessagingPort(int sock, const SockAddr& farEnd); 00095 00096 // in some cases the timeout will actually be 2x this value - eg we do a partial send, 00097 // then the timeout fires, then we try to send again, then the timeout fires again with 00098 // no data sent, then we detect that the other side is down 00099 MessagingPort(double so_timeout = 0, int logLevel = 0 ); 00100 00101 virtual ~MessagingPort(); 00102 00103 void shutdown(); 00104 00105 bool connect(SockAddr& farEnd); 00106 00107 /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. 00108 also, the Message data will go out of scope on the subsequent recv call. 00109 */ 00110 bool recv(Message& m); 00111 void reply(Message& received, Message& response, MSGID responseTo); 00112 void reply(Message& received, Message& response); 00113 bool call(Message& toSend, Message& response); 00114 00115 void say(Message& toSend, int responseTo = -1); 00116 00126 bool recv( const Message& sent , Message& response ); 00127 00128 void piggyBack( Message& toSend , int responseTo = -1 ); 00129 00130 virtual unsigned remotePort() const; 00131 virtual HostAndPort remote() const; 00132 00133 // send len or throw SocketException 00134 void send( const char * data , int len, const char *context ); 00135 void send( const vector< pair< char *, int > > &data, const char *context ); 00136 00137 // recv len or throw SocketException 00138 void recv( char * data , int len ); 00139 00140 int unsafe_recv( char *buf, int max ); 00141 00142 void clearCounters() { _bytesIn = 0; _bytesOut = 0; } 00143 long long getBytesIn() const { return _bytesIn; } 00144 long long getBytesOut() const { return _bytesOut; } 00145 private: 00146 int sock; 00147 PiggyBackData * piggyBackData; 00148 00149 long long _bytesIn; 00150 long long _bytesOut; 00151 00152 // this is the parsed version of farEnd 00153 // mutable because its initialized only on call to remote() 00154 mutable HostAndPort _farEndParsed; 00155 00156 public: 00157 SockAddr farEnd; 00158 double _timeout; 00159 int _logLevel; // passed to log() when logging errors 00160 00161 static void closeAllSockets(unsigned tagMask = 0xffffffff); 00162 00163 /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ 00164 unsigned tag; 00165 00166 friend class PiggyBackData; 00167 }; 00168 00169 enum Operations { 00170 opReply = 1, /* reply. responseTo is set. */ 00171 dbMsg = 1000, /* generic msg command followed by a string */ 00172 dbUpdate = 2001, /* update object */ 00173 dbInsert = 2002, 00174 //dbGetByOID = 2003, 00175 dbQuery = 2004, 00176 dbGetMore = 2005, 00177 dbDelete = 2006, 00178 dbKillCursors = 2007 00179 }; 00180 00181 bool doesOpGetAResponse( int op ); 00182 00183 inline const char * opToString( int op ) { 00184 switch ( op ) { 00185 case 0: return "none"; 00186 case opReply: return "reply"; 00187 case dbMsg: return "msg"; 00188 case dbUpdate: return "update"; 00189 case dbInsert: return "insert"; 00190 case dbQuery: return "query"; 00191 case dbGetMore: return "getmore"; 00192 case dbDelete: return "remove"; 00193 case dbKillCursors: return "killcursors"; 00194 default: 00195 PRINT(op); 00196 assert(0); 00197 return ""; 00198 } 00199 } 00200 00201 inline bool opIsWrite( int op ) { 00202 switch ( op ) { 00203 00204 case 0: 00205 case opReply: 00206 case dbMsg: 00207 case dbQuery: 00208 case dbGetMore: 00209 case dbKillCursors: 00210 return false; 00211 00212 case dbUpdate: 00213 case dbInsert: 00214 case dbDelete: 00215 return false; 00216 00217 default: 00218 PRINT(op); 00219 assert(0); 00220 return ""; 00221 } 00222 00223 } 00224 00225 #pragma pack(1) 00226 /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol 00227 */ 00228 struct MSGHEADER { 00229 int messageLength; // total message size, including this 00230 int requestID; // identifier for this message 00231 int responseTo; // requestID from the original request 00232 // (used in reponses from db) 00233 int opCode; 00234 }; 00235 struct OP_GETMORE : public MSGHEADER { 00236 MSGHEADER header; // standard message header 00237 int ZERO_or_flags; // 0 - reserved for future use 00238 //cstring fullCollectionName; // "dbname.collectionname" 00239 //int32 numberToReturn; // number of documents to return 00240 //int64 cursorID; // cursorID from the OP_REPLY 00241 }; 00242 #pragma pack() 00243 00244 #pragma pack(1) 00245 /* todo merge this with MSGHEADER (or inherit from it). */ 00246 struct MsgData { 00247 int len; /* len of the msg, including this field */ 00248 MSGID id; /* request/reply id's match... */ 00249 MSGID responseTo; /* id of the message we are responding to */ 00250 short _operation; 00251 char _flags; 00252 char _version; 00253 int operation() const { 00254 return _operation; 00255 } 00256 void setOperation(int o) { 00257 _flags = 0; 00258 _version = 0; 00259 _operation = o; 00260 } 00261 char _data[4]; 00262 00263 int& dataAsInt() { 00264 return *((int *) _data); 00265 } 00266 00267 bool valid() { 00268 if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) ) 00269 return false; 00270 if ( _operation < 0 || _operation > 30000 ) 00271 return false; 00272 return true; 00273 } 00274 00275 long long getCursor() { 00276 assert( responseTo > 0 ); 00277 assert( _operation == opReply ); 00278 long long * l = (long long *)(_data + 4); 00279 return l[0]; 00280 } 00281 00282 int dataLen(); // len without header 00283 }; 00284 const int MsgDataHeaderSize = sizeof(MsgData) - 4; 00285 inline int MsgData::dataLen() { 00286 return len - MsgDataHeaderSize; 00287 } 00288 #pragma pack() 00289 00290 class Message { 00291 public: 00292 // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). 00293 Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {} 00294 Message( void * data , bool freeIt ) : 00295 _buf( 0 ), _data( 0 ), _freeIt( false ) { 00296 _setData( reinterpret_cast< MsgData* >( data ), freeIt ); 00297 }; 00298 Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) { 00299 *this = r; 00300 } 00301 ~Message() { 00302 reset(); 00303 } 00304 00305 SockAddr _from; 00306 00307 MsgData *header() const { 00308 assert( !empty() ); 00309 return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first ); 00310 } 00311 int operation() const { return header()->operation(); } 00312 00313 MsgData *singleData() const { 00314 massert( 13273, "single data buffer expected", _buf ); 00315 return header(); 00316 } 00317 00318 bool empty() const { return !_buf && _data.empty(); } 00319 00320 int size() const { 00321 int res = 0; 00322 if ( _buf ) { 00323 res = _buf->len; 00324 } 00325 else { 00326 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) { 00327 res += it->second; 00328 } 00329 } 00330 return res; 00331 } 00332 00333 int dataSize() const { return size() - sizeof(MSGHEADER); } 00334 00335 // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy 00336 // can get rid of this if we make response handling smarter 00337 void concat() { 00338 if ( _buf || empty() ) { 00339 return; 00340 } 00341 00342 assert( _freeIt ); 00343 int totalSize = 0; 00344 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00345 totalSize += i->second; 00346 } 00347 char *buf = (char*)malloc( totalSize ); 00348 char *p = buf; 00349 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00350 memcpy( p, i->first, i->second ); 00351 p += i->second; 00352 } 00353 reset(); 00354 _setData( (MsgData*)buf, true ); 00355 } 00356 00357 // vector swap() so this is fast 00358 Message& operator=(Message& r) { 00359 assert( empty() ); 00360 assert( r._freeIt ); 00361 _buf = r._buf; 00362 r._buf = 0; 00363 if ( r._data.size() > 0 ) { 00364 _data.swap( r._data ); 00365 } 00366 r._freeIt = false; 00367 _freeIt = true; 00368 return *this; 00369 } 00370 00371 void reset() { 00372 if ( _freeIt ) { 00373 if ( _buf ) { 00374 free( _buf ); 00375 } 00376 for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { 00377 free(i->first); 00378 } 00379 } 00380 _buf = 0; 00381 _data.clear(); 00382 _freeIt = false; 00383 } 00384 00385 // use to add a buffer 00386 // assumes message will free everything 00387 void appendData(char *d, int size) { 00388 if ( size <= 0 ) { 00389 return; 00390 } 00391 if ( empty() ) { 00392 MsgData *md = (MsgData*)d; 00393 md->len = size; // can be updated later if more buffers added 00394 _setData( md, true ); 00395 return; 00396 } 00397 assert( _freeIt ); 00398 if ( _buf ) { 00399 _data.push_back( make_pair( (char*)_buf, _buf->len ) ); 00400 _buf = 0; 00401 } 00402 _data.push_back( make_pair( d, size ) ); 00403 header()->len += size; 00404 } 00405 00406 // use to set first buffer if empty 00407 void setData(MsgData *d, bool freeIt) { 00408 assert( empty() ); 00409 _setData( d, freeIt ); 00410 } 00411 void setData(int operation, const char *msgtxt) { 00412 setData(operation, msgtxt, strlen(msgtxt)+1); 00413 } 00414 void setData(int operation, const char *msgdata, size_t len) { 00415 assert( empty() ); 00416 size_t dataLen = len + sizeof(MsgData) - 4; 00417 MsgData *d = (MsgData *) malloc(dataLen); 00418 memcpy(d->_data, msgdata, len); 00419 d->len = fixEndian(dataLen); 00420 d->setOperation(operation); 00421 _setData( d, true ); 00422 } 00423 00424 bool doIFreeIt() { 00425 return _freeIt; 00426 } 00427 00428 void send( MessagingPort &p, const char *context ) { 00429 if ( empty() ) { 00430 return; 00431 } 00432 if ( _buf != 0 ) { 00433 p.send( (char*)_buf, _buf->len, context ); 00434 } 00435 else { 00436 p.send( _data, context ); 00437 } 00438 } 00439 00440 private: 00441 void _setData( MsgData *d, bool freeIt ) { 00442 _freeIt = freeIt; 00443 _buf = d; 00444 } 00445 // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data 00446 MsgData * _buf; 00447 // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead 00448 typedef vector< pair< char*, int > > MsgVec; 00449 MsgVec _data; 00450 bool _freeIt; 00451 }; 00452 00453 class SocketException : public DBException { 00454 public: 00455 const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type; 00456 00457 SocketException( Type t , string server="" , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ } 00458 virtual ~SocketException() throw() {} 00459 00460 bool shouldPrint() const { return _type != CLOSED; } 00461 virtual string toString() const; 00462 00463 private: 00464 string _server; 00465 string _extra; 00466 }; 00467 00468 MSGID nextMessageId(); 00469 00470 extern TicketHolder connTicketHolder; 00471 00472 class ElapsedTracker { 00473 public: 00474 ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks ) 00475 : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) { 00476 _last = Listener::getElapsedTimeMillis(); 00477 } 00478 00483 bool ping() { 00484 if ( ( ++_pings % _h ) == 0 ) { 00485 _last = Listener::getElapsedTimeMillis(); 00486 return true; 00487 } 00488 00489 long long now = Listener::getElapsedTimeMillis(); 00490 if ( now - _last > _ms ) { 00491 _last = now; 00492 return true; 00493 } 00494 00495 return false; 00496 } 00497 00498 private: 00499 int _h; 00500 int _ms; 00501 00502 unsigned long long _pings; 00503 00504 long long _last; 00505 00506 }; 00507 00508 } // namespace mongo
1.7.5.1