class MCollective::Client

Helpers for writing clients that can talk to agents, do discovery and so forth

Attributes

connection_timeout[RW]
discoverer[RW]
options[RW]
stats[RW]

Public Class Methods

new(options) click to toggle source
# File lib/mcollective/client.rb, line 6
def initialize(options)
  @config = Config.instance
  @options = nil

  if options.is_a?(String)
    # String is the path to a config file
    @config.loadconfig(options) unless @config.configured
  elsif options.is_a?(Hash)
    @config.loadconfig(options[:config]) unless @config.configured
    @options = options
    @connection_timeout = options[:connection_timeout]
  else
    raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
  end

  @connection_timeout ||= @config.connection_timeout

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @subscriptions = {}

  @discoverer = Discovery.new(self)

  # Time box the connection if a timeout has been specified
  # connection_timeout defaults to nil which means it will try forever if
  # not specified
  begin
    Timeout::timeout(@connection_timeout, ClientTimeoutError) do
      @connection.connect
    end
  rescue ClientTimeoutError => e
    Log.error("Timeout occured while trying to connect to middleware")
    raise e
  end
end
request_sequence() click to toggle source
# File lib/mcollective/client.rb, line 45
def self.request_sequence
  @@request_sequence
end

Public Instance Methods

collective() click to toggle source

Returns the configured main collective if no specific collective is specified as options

# File lib/mcollective/client.rb, line 51
def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end
createreq(msg, agent, filter ={}) click to toggle source
# File lib/mcollective/client.rb, line 73
def createreq(msg, agent, filter ={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end
disconnect() click to toggle source

Disconnects cleanly from the middleware

# File lib/mcollective/client.rb, line 60
def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end
discover(filter, timeout, limit=0) click to toggle source

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

# File lib/mcollective/client.rb, line 147
def discover(filter, timeout, limit=0)
  @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit)
end
discovered_req(body, agent, options=false) click to toggle source
# File lib/mcollective/client.rb, line 300
def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end
display_stats(stats, options=false, caption="stomp call summary") click to toggle source

Prints out the stats returns from req and #discovered_req in a nice way

# File lib/mcollective/client.rb, line 305
def display_stats(stats, options=false, caption="stomp call summary")
  options = @options unless options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  else
    if stats[:discovered]
      printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
    else
      printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
    end
  end

  if stats[:noresponsefrom].size > 0
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end

  if stats[:unexpectedresponsefrom].size > 0
    puts("\nUnexpected response from:\n")

    stats[:unexpectedresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end
publish(request) click to toggle source
# File lib/mcollective/client.rb, line 231
def publish(request)
  Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
  request.publish
end
receive(requestid = nil) click to toggle source

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you've previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

# File lib/mcollective/client.rb, line 115
def receive(requestid = nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!

    unless reply.requestid == requestid
      raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
    end

    Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end
req(body, agent=nil, options=false, waitfor=[], &block) click to toggle source

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

# File lib/mcollective/client.rb, line 159
def req(body, agent=nil, options=false, waitfor=[], &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts || []
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout] || @config.publish_timeout
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  STDOUT.sync = true
  hosts_responded = 0

  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt => e
  ensure
    unsubscribe(agent, :reply)
  end

  return update_stat(stat, hosts_responded, request.requestid)
end
sendreq(msg, agent, filter = {}) click to toggle source

Sends a request and returns the generated request id, doesn't wait for responses and doesn't execute any passed in code blocks for responses

# File lib/mcollective/client.rb, line 67
def sendreq(msg, agent, filter = {})
  request = createreq(msg, agent, filter)
  publish(request)
  request.requestid
end
start_publisher(request, publish_timeout) click to toggle source

Starts the request publishing routine

# File lib/mcollective/client.rb, line 220
def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      publish(request)
    end
  rescue Timeout::Error => e
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end
start_receiver(requestid, waitfor, timeout) { |payload, resp| ... } click to toggle source

Starts the response receiver routine Expected to return the amount of received responses.

# File lib/mcollective/client.rb, line 238
def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0

  if (waitfor.is_a?(Array))
    unfinished = Hash.new(0)
    waitfor.each {|w| unfinished[w] += 1}
  else
    unfinished = []
  end

  begin
    Timeout.timeout(timeout) do
      loop do
        resp = receive(requestid)

        if block.arity == 2
          yield resp.payload, resp
        else
          yield resp.payload
        end

        hosts_responded += 1

        if (waitfor.is_a?(Array))
          sender = resp.payload[:senderid]
          if unfinished[sender] <= 1
            unfinished.delete(sender)
          else
            unfinished[sender] -= 1
          end

          break if !waitfor.empty? && unfinished.empty?
        else
          break unless waitfor == 0 || hosts_responded < waitfor
        end
      end
    end
  rescue Timeout::Error => e
    if waitfor.is_a?(Array)
      if !unfinished.empty?
        Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}")
      end
    elsif (waitfor > hosts_responded)
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end
subscribe(agent, type) click to toggle source
# File lib/mcollective/client.rb, line 90
def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end
threaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher in threads. This is activated when the 'threader_client' configuration option is set.

# File lib/mcollective/client.rb, line 199
def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  publisher = Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end
unsubscribe(agent, type) click to toggle source
# File lib/mcollective/client.rb, line 100
def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end
unthreaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher unthreaded. This is the default client behaviour.

# File lib/mcollective/client.rb, line 191
def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end
update_stat(stat, hosts_responded, requestid) click to toggle source
# File lib/mcollective/client.rb, line 289
def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:unexpectedresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end