Class: Mongo::ReplSetConnection

Inherits:
Connection show all
Defined in:
lib/mongo/repl_set_connection.rb

Overview

Instantiates and manages connections to a MongoDB replica set.

Constant Summary

REPL_SET_OPTS =
[:read, :refresh_mode, :refresh_interval, :require_primary,
:read_secondary, :rs_name, :name]

Constants inherited from Connection

Connection::CONNECTION_OPTS, Connection::ConditionVariable, Connection::DEFAULT_PORT, Connection::GENERIC_OPTS, Connection::Mutex, Connection::TCPSocket

Constants included from Networking

Networking::BINARY_ENCODING, Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Constants included from Logging

Logging::DEBUG_LEVEL

Instance Attribute Summary (collapse)

Attributes inherited from Connection

#auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #safe, #size, #socket_class

Instance Method Summary (collapse)

Methods inherited from Connection

#[], #active?, #add_auth, #apply_saved_authentication, #best_available_socket, #checkin, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #lock!, #locked?, multi, #ping, #remove_auth, #server_info, #server_version, #unlock!

Methods included from Networking

#new_binary_string, #receive_message, #send_message, #send_message_with_safe_check

Methods included from Logging

#instrument, #log, #write_logging_startup_message

Constructor Details

- (ReplSetConnection) initialize(*args)

Create a connection to a MongoDB replica set.

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and Connection#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

Note: that the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

Examples:

Connect to a replica set and provide two seed nodes.

ReplSetConnection.new(['localhost:30000', 'localhost:30001'])

Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

ReplSetConnection.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

ReplSetConnection.new(['localhost:30000', 'localhost:30001'], :read => :secondary)

Parameters:

  • seeds (Array)

    “host:port” strings

  • opts (Hash)

    a customizable set of options

Raises:

  • (ReplicaSetConnectionError)

    This is raised if a replica set name is specified and the driver fails to connect to a replica set with that name.

See Also:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/mongo/repl_set_connection.rb', line 83

def initialize(*args)
  if args.last.is_a?(Hash)
    opts = args.pop
  else
    opts = {}
  end

  unless args.length > 0
    raise MongoArgumentError, "A ReplSetConnection requires at least one seed node."
  end

  # This is temporary until support for the old format is dropped
  @seeds = []
  if args.first.last.is_a?(Integer)
    warn "Initiating a ReplSetConnection with seeds passed as individual [host, port] array arguments is deprecated."
    warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
    @seeds = args
  else
    args.first.map do |host_port|
      seed = host_port.split(":")
      seed[1] = seed[1].to_i
      seeds << seed
    end
  end

  # TODO: add a method for replacing this list of node.
  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil
  @old_managers = []

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  check_opts(opts)
  setup(opts)
end

Instance Attribute Details

- (Object) refresh_interval (readonly)

Returns the value of attribute refresh_interval



27
28
29
# File 'lib/mongo/repl_set_connection.rb', line 27

def refresh_interval
  @refresh_interval
end

- (Object) refresh_mode (readonly)

Returns the value of attribute refresh_mode



27
28
29
# File 'lib/mongo/repl_set_connection.rb', line 27

def refresh_mode
  @refresh_mode
end

- (Object) refresh_version (readonly)

Returns the value of attribute refresh_version



27
28
29
# File 'lib/mongo/repl_set_connection.rb', line 27

def refresh_version
  @refresh_version
end

- (Object) replica_set_name (readonly)

Returns the value of attribute replica_set_name



27
28
29
# File 'lib/mongo/repl_set_connection.rb', line 27

def replica_set_name
  @replica_set_name
end

- (Object) seeds (readonly)

Returns the value of attribute seeds



27
28
29
# File 'lib/mongo/repl_set_connection.rb', line 27

def seeds
  @seeds
end

Instance Method Details

- (Object) arbiters



425
426
427
# File 'lib/mongo/repl_set_connection.rb', line 425

def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end

- (Object) authenticate_pools



289
290
291
292
293
294
295
296
# File 'lib/mongo/repl_set_connection.rb', line 289

def authenticate_pools
  if primary_pool
    primary_pool.authenticate_existing
  end
  secondary_pools.each do |pool|
    pool.authenticate_existing
  end
end

- (Object) checkin_reader(socket)

Checkin a socket used for reading.



374
375
376
377
378
379
# File 'lib/mongo/repl_set_connection.rb', line 374

def checkin_reader(socket)
  if socket
    socket.pool.checkin(socket)
  end
  sync_refresh
end

- (Object) checkin_writer(socket)

Checkin a socket used for writing.



382
383
384
385
386
387
# File 'lib/mongo/repl_set_connection.rb', line 382

def checkin_writer(socket)
  if socket
    socket.pool.checkin(socket)
  end
  sync_refresh
end

- (Object) checkout(&block)

Generic socket checkout Takes a block that returns a socket from pool



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/mongo/repl_set_connection.rb', line 309

def checkout(&block)
  if connected?
    sync_refresh
  else
    connect
  end
  
  begin
    socket = block.call
  rescue => ex
    checkin(socket) if socket
    raise ex
  end
  
  if socket
    socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
end

- (Object) checkout_best

Checkout best available socket by trying primary pool first and then falling back to secondary.



333
334
335
336
337
338
339
340
341
342
# File 'lib/mongo/repl_set_connection.rb', line 333

def checkout_best
  checkout do
    socket = get_socket_from_pool(:primary)
    if !socket
      connect
      socket = get_socket_from_pool(:secondary)
    end
    socket
  end
end

- (Object) checkout_reader

Checkout a socket for reading (i.e., a secondary node). Note that @read_pool might point to the primary pool if no read pool has been defined.



347
348
349
350
351
352
353
354
355
356
# File 'lib/mongo/repl_set_connection.rb', line 347

def checkout_reader
  checkout do
    socket = get_socket_from_pool(:read)
    if !socket
      connect
      socket = get_socket_from_pool(:primary)
    end
    socket
  end
end

- (Object) checkout_secondary

Checkout a socket from a secondary For :read_preference => :secondary_only



360
361
362
363
364
# File 'lib/mongo/repl_set_connection.rb', line 360

def checkout_secondary
  checkout do
    get_socket_from_pool(:secondary)
  end
end

- (Object) checkout_writer

Checkout a socket for writing (i.e., a primary node).



367
368
369
370
371
# File 'lib/mongo/repl_set_connection.rb', line 367

def checkout_writer
  checkout do
    get_socket_from_pool(:primary)
  end
end

- (Object) close(opts = {})

Close the connection to the database.



260
261
262
263
264
265
266
267
# File 'lib/mongo/repl_set_connection.rb', line 260

def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end
  @connected = false
end

- (Object) close_socket(socket)



389
390
391
392
393
394
395
# File 'lib/mongo/repl_set_connection.rb', line 389

def close_socket(socket)
  begin
    socket.close if socket
  rescue IOError
    log(:info, "Tried to close socket #{socket} but already closed.")
  end
end

- (Object) connect

Initiate a connection to the replica set.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/mongo/repl_set_connection.rb', line 145

def connect
  log(:info, "Connecting...")
  @connect_mutex.synchronize do
    return if @connected

    discovered_seeds = @manager ? @manager.seeds : []
    @manager = PoolManager.new(self, discovered_seeds)
    
    Thread.current[:manager] = @manager

    @manager.connect
    @refresh_version += 1

    if @require_primary && @manager.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
      close
      raise ConnectionFailure, "Failed to connect to primary node."
    elsif @manager.read_pool.nil?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    else
      @connected = true
    end
  end
end

- (Boolean) connected?

Returns:

  • (Boolean)


216
217
218
# File 'lib/mongo/repl_set_connection.rb', line 216

def connected?
  @connected && (@manager.primary_pool || @manager.read_pool)
end

- (Boolean) connecting?

Deprecated.

Returns:

  • (Boolean)


221
222
223
224
# File 'lib/mongo/repl_set_connection.rb', line 221

def connecting?
  warn "ReplSetConnection#connecting? is deprecated and will be removed in v2.0."
  false
end

- (Object) get_socket_from_pool(pool_type)



397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/mongo/repl_set_connection.rb', line 397

def get_socket_from_pool(pool_type)
  if Thread.current[:manager] != @manager
    Thread.current[:manager] = @manager
  end
  
  pool = case pool_type
    when :primary
      primary_pool
    when :secondary
      secondary_pool
    when :read
      read_pool
  end

  begin
    if pool
      pool.checkout
    end
  rescue ConnectionFailure => ex
    log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
    return nil
  end
end

- (Boolean) hard_refresh!

Force a hard refresh of this connection's view of the replica set.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/mongo/repl_set_connection.rb', line 200

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  discovered_seeds = @manager ? @manager.seeds : []
  new_manager = PoolManager.new(self, discovered_seeds | @seeds)
  new_manager.connect

  Thread.current[:manager] = new_manager

  # TODO: make sure that connect has succeeded
  @old_managers << @manager
  @manager = new_manager

  @refresh_version += 1
  return true
end

- (String) host

The replica set primary's host name.

Returns:



229
230
231
# File 'lib/mongo/repl_set_connection.rb', line 229

def host
  @manager.primary_pool.host
end

- (Object) hosts



438
439
440
# File 'lib/mongo/repl_set_connection.rb', line 438

def hosts
  local_manager ? local_manager.hosts : []
end

- (Object) inspect



139
140
141
142
# File 'lib/mongo/repl_set_connection.rb', line 139

def inspect
  "<Mongo::ReplSetConnection:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end

- (Object) local_manager



421
422
423
# File 'lib/mongo/repl_set_connection.rb', line 421

def local_manager
  Thread.current[:manager]
end

- (Object) logout_pools(db)



298
299
300
301
302
303
304
305
# File 'lib/mongo/repl_set_connection.rb', line 298

def logout_pools(db)
  if primary_pool
    primary_pool.logout_existing(db)
  end
  secondary_pools.each do |pool|
    pool.logout_existing(db)
  end
end

- (Object) max_bson_size



462
463
464
465
466
467
468
# File 'lib/mongo/repl_set_connection.rb', line 462

def max_bson_size
  if local_manager && local_manager.max_bson_size
    local_manager.max_bson_size
  else
    Mongo::DEFAULT_MAX_BSON_SIZE
  end
end

- (Object) nodes



240
241
242
243
244
# File 'lib/mongo/repl_set_connection.rb', line 240

def nodes
  warn "ReplSetConnection#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use ReplSetConnection#seeds instead."
  @seeds
end

- (Integer) port

The replica set primary's port.

Returns:

  • (Integer)


236
237
238
# File 'lib/mongo/repl_set_connection.rb', line 236

def port
  @manager.primary_pool.port
end

- (Object) primary



429
430
431
# File 'lib/mongo/repl_set_connection.rb', line 429

def primary
  local_manager ? local_manager.primary : nil
end

- (Object) primary_pool



442
443
444
# File 'lib/mongo/repl_set_connection.rb', line 442

def primary_pool
  local_manager ? local_manager.primary_pool : nil
end

- (Object) read_pool



446
447
448
# File 'lib/mongo/repl_set_connection.rb', line 446

def read_pool
  local_manager ? local_manager.read_pool : nil
end

- (Object) read_preference



255
256
257
# File 'lib/mongo/repl_set_connection.rb', line 255

def read_preference
  @read
end

- (Boolean) read_primary? Also known as: primary?

Determine whether we're reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

Returns:

  • (Boolean)


250
251
252
# File 'lib/mongo/repl_set_connection.rb', line 250

def read_primary?
  @manager.read_pool == @manager.primary_pool
end

- (Boolean) refresh(opts = {})

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running ReplSetConnection#hard_refresh!

Returns:

  • (Boolean)

    true unless a hard refresh is run and the refresh lock can't be acquired.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/mongo/repl_set_connection.rb', line 177

def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end

- (Object) reset_connection

Deprecated.

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.



272
273
274
275
276
# File 'lib/mongo/repl_set_connection.rb', line 272

def reset_connection
  close
  warn "ReplSetConnection#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use ReplSetConnection#close instead."
end

- (Object) secondaries

Note: might want to freeze these after connecting.



434
435
436
# File 'lib/mongo/repl_set_connection.rb', line 434

def secondaries
  local_manager ? local_manager.secondaries : []
end

- (Object) secondary_pool



450
451
452
# File 'lib/mongo/repl_set_connection.rb', line 450

def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end

- (Object) secondary_pools



454
455
456
# File 'lib/mongo/repl_set_connection.rb', line 454

def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end

- (Boolean) slave_ok?

Returns true if it's okay to read from a secondary node. Since this is a replica set, this must always be true.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



285
286
287
# File 'lib/mongo/repl_set_connection.rb', line 285

def slave_ok?
  true
end

- (Object) tag_map



458
459
460
# File 'lib/mongo/repl_set_connection.rb', line 458

def tag_map
  local_manager ? local_manager.tag_map : {}
end

- (Object) valid_opts



135
136
137
# File 'lib/mongo/repl_set_connection.rb', line 135

def valid_opts
  GENERIC_OPTS + REPL_SET_OPTS
end