class Mongo::Cluster
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
@since 2.0.0
Constants
- CLUSTER_TIME
The cluster time key in responses from mongos servers.
@since 2.5.0
- IDLE_WRITE_PERIOD_SECONDS
How often an idle primary writes a no-op to the oplog.
@since 2.4.0
- MAX_READ_RETRIES
The default number of mongos read retries.
@since 2.1.1
- MAX_WRITE_RETRIES
The default number of mongos write retries.
@since 2.4.2
- READ_RETRY_INTERVAL
The default mongos read retry interval, in seconds.
@since 2.1.1
Attributes
@return [ Mongo::Cluster::AppMetadata ] The application metadata, used for connection
handshakes.
@since 2.4.0
@return [ BSON::Document ] The latest cluster time seen.
@since 2.5.0
@return [ Monitoring ] monitoring The monitoring.
@return [ Hash ] The options hash.
@private
@since 2.5.1
@return [ Object ] The cluster topology.
Public Class Methods
Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.
@api private
@example Create a cluster for the client.
Cluster.create(client)
@param [ Client ] client The client to create on.
@return [ Cluster ] The cluster.
@since 2.0.0
# File lib/mongo/cluster.rb, line 446 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), client.instance_variable_get(:@monitoring).dup, client.options ) client.instance_variable_set(:@cluster, cluster) end
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
@example Finalize the cluster.
Cluster.finalize(pools)
@param [ Hash<Address, Server::ConnectionPool> ] pools The connection pools. @param [ PeriodicExecutor ] periodic_executor The periodic executor. @param [ SessionPool ] #session_pool The session pool.
@return [ Proc ] The Finalizer.
@since 2.2.0
# File lib/mongo/cluster.rb, line 220 def self.finalize(pools, periodic_executor, session_pool) proc do session_pool.end_sessions periodic_executor.stop! pools.values.each do |pool| pool.disconnect! end end end
Instantiate the new cluster.
@api private
@example Instantiate the cluster.
Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)
@note Cluster should never be directly instantiated outside of a Client.
@param [ Array<String> ] seeds The addresses of the configured servers. @param [ Monitoring ] monitoring The monitoring. @param [ Hash ] options The options.
@since 2.0.0
# File lib/mongo/cluster.rb, line 169 def initialize(seeds, monitoring, options = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = options.freeze @app_metadata = AppMetadata.new(self) @update_lock = Mutex.new @pool_lock = Mutex.new @cluster_time = nil @cluster_time_lock = Mutex.new @topology = Topology.initial(seeds, monitoring, options) Session::SessionPool.create(self) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(@topology) ) subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(@topology, @topology) ) if @servers.size > 1 @cursor_reaper = CursorReaper.new @socket_reaper = SocketReaper.new(self) @periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper) @periodic_executor.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools, @periodic_executor, @session_pool)) end
Public Instance Methods
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
@example Is the cluster equal to the object?
cluster == other
@param [ Object ] other The object to compare to.
@return [ true, false ] If the objects are equal.
@since 2.0.0
# File lib/mongo/cluster.rb, line 98 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && options == other.options end
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
@example Add the server for the address to the cluster.
cluster.add('127.0.0.1:27018')
@param [ String ] host The address of the server to add.
@return [ Server ] The newly added server, if not present already.
@since 2.0.0
# File lib/mongo/cluster.rb, line 115 def add(host) address = Address.new(host, options) if !addresses.include?(address) if addition_allowed?(address) @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, options) @update_lock.synchronize { @servers.push(server) } server end end end
Add hosts in a description to the cluster.
@example Add hosts in a description to the cluster.
cluster.add_hosts(description)
@param [ Mongo::Server::Description ] description The description.
@since 2.0.6
# File lib/mongo/cluster.rb, line 411 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end
The addresses in the cluster.
@example Get the addresses in the cluster.
cluster.addresses
@return [ Array<Mongo::Address> ] The addresses.
@since 2.0.6
# File lib/mongo/cluster.rb, line 463 def addresses addresses_list end
Disconnect all servers.
@example Disconnect the cluster's servers.
cluster.disconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/cluster.rb, line 384 def disconnect! @periodic_executor.stop! @servers.each { |server| server.disconnect! } and true end
Elect a primary server from the description that has just changed to a primary.
@example Elect a primary server.
cluster.elect_primary!(description)
@param [ Server::Description ] description The newly elected primary.
@return [ Topology ] The cluster topology.
@since 2.0.0
# File lib/mongo/cluster.rb, line 268 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end
Determine if the cluster would select a readable server for the provided read preference.
@example Is a readable server present?
topology.has_readable_server?(server_selector)
@param [ ServerSelector ] server_selector The server
selector.
@return [ true, false ] If a readable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 139 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end
Determine if the cluster would select a writable server.
@example Is a writable server present?
topology.has_writable_server?
@return [ true, false ] If a writable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 151 def has_writable_server? topology.has_writable_server?(self) end
Get the nicer formatted string for use in inspection.
@example Inspect the cluster.
cluster.inspect
@return [ String ] The cluster inspection.
@since 2.0.0
# File lib/mongo/cluster.rb, line 238 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end
The logical session timeout value in minutes.
@example Get the logical session timeout in minutes.
cluster.logical_session_timeout
@return [ Integer, nil ] The logical session timeout.
@since 2.5.0
# File lib/mongo/cluster.rb, line 475 def logical_session_timeout servers.inject(nil) do |min, server| break unless timeout = server.logical_session_timeout [timeout, (min || timeout)].min end end
Get the maximum number of times the cluster can retry a read operation on a mongos.
@example Get the max read retries.
cluster.max_read_retries
@return [ Integer ] The maximum retries.
@since 2.1.1
# File lib/mongo/cluster.rb, line 281 def max_read_retries options[:max_read_retries] || MAX_READ_RETRIES end
Get the next primary server we can send an operation to.
@example Get the next primary server.
cluster.next_primary
@param [ true, false ] ping Whether to ping the server before selection.
@return [ Mongo::Server ] A primary server.
@since 2.0.0
# File lib/mongo/cluster.rb, line 252 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self, ping) end
Get the scoped connection pool for the server.
@example Get the connection pool.
cluster.pool(server)
@param [ Server ] server The server.
@return [ Server::ConnectionPool ] The connection pool.
@since 2.2.0
# File lib/mongo/cluster.rb, line 295 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end
Get the interval, in seconds, in which a mongos read operation is retried.
@example Get the read retry interval.
cluster.read_retry_interval
@return [ Float ] The interval.
@since 2.1.1
# File lib/mongo/cluster.rb, line 310 def read_retry_interval options[:read_retry_interval] || READ_RETRY_INTERVAL end
Reconnect all servers.
@example Reconnect the cluster's servers.
cluster.reconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/cluster.rb, line 397 def reconnect! scan! servers.each { |server| server.reconnect! } @periodic_executor.restart! and true end
Remove the server from the cluster for the provided address, if it exists.
@example Remove the server from the cluster.
server.remove('127.0.0.1:27017')
@param [ String ] host The host/port or socket address.
@since 2.0.0
# File lib/mongo/cluster.rb, line 336 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end
Remove hosts in a description from the cluster.
@example Remove hosts in a description from the cluster.
cluster.remove_hosts(description)
@param [ Mongo::Server::Description ] description The description.
@since 2.0.6
# File lib/mongo/cluster.rb, line 425 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end
Force a scan of all known servers in the cluster.
@example Force a full cluster scan.
cluster.scan!
@note This operation is done synchronously. If servers in the cluster are
down or slow to respond this can potentially be a slow operation.
@return [ true ] Always true.
@since 2.0.0
# File lib/mongo/cluster.rb, line 359 def scan! servers_list.each{ |server| server.scan! } and true end
Get a list of server candidates from the cluster that can have operations executed on them.
@example Get the server candidates for an operation.
cluster.servers
@return [ Array<Server> ] The candidate servers.
@since 2.0.0
# File lib/mongo/cluster.rb, line 372 def servers topology.servers(servers_list.compact).compact end
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
@example Notify the cluster that a standalone server was discovered.
cluster.standalone_discovered
@return [ Topology ] The cluster topology.
@since 2.0.6
# File lib/mongo/cluster.rb, line 323 def standalone_discovered @topology = topology.standalone_discovered end
Update the max cluster time seen in a response.
@example Update the cluster time.
cluster.update_cluster_time(result)
@param [ Operation::Result ] result The operation result containing the cluster time.
@return [ Object ] The cluster time.
@since 2.5.0
# File lib/mongo/cluster.rb, line 492 def update_cluster_time(result) if cluster_time_doc = result.cluster_time @cluster_time_lock.synchronize do if @cluster_time.nil? @cluster_time = cluster_time_doc elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME] @cluster_time = cluster_time_doc end end end end
Private Instance Methods
# File lib/mongo/cluster.rb, line 532 def addition_allowed?(address) !@topology.single? || direct_connection?(address) end
# File lib/mongo/cluster.rb, line 544 def addresses_list @update_lock.synchronize { @addresses.dup } end
# File lib/mongo/cluster.rb, line 528 def direct_connection?(address) address.seed == @topology.seed end
# File lib/mongo/cluster.rb, line 506 def get_session(options = {}) return options[:session].validate!(self) if options[:session] if sessions_supported? Session.new(@session_pool.checkout, self, { implicit: true }.merge(options)) end end
# File lib/mongo/cluster.rb, line 536 def pools @pools ||= {} end
# File lib/mongo/cluster.rb, line 540 def servers_list @update_lock.synchronize { @servers.dup } end
# File lib/mongo/cluster.rb, line 520 def sessions_supported? if servers.empty? && !topology.single? ServerSelector.get(mode: :primary_preferred).select_server(self) end !!logical_session_timeout rescue Error::NoServerAvailable end
# File lib/mongo/cluster.rb, line 513 def with_session(options = {}) session = get_session(options) yield(session) ensure session.end_session if (session && session.implicit?) end