class Sensu::Client::Socket

EventMachine connection handler for the Sensu client“s socket.

The Sensu client listens on localhost, port 3030 (by default), for UDP and TCP traffic. This allows software running on the host to push check results (that may contain metrics) into Sensu, without needing to know anything about Sensu“s internal implementation.

The socket only accepts 7-bit ASCII-encoded data.

Although the Sensu client accepts UDP and TCP traffic, you must be aware of the UDP protocol limitations. Any data you send over UDP must fit in a single datagram and you will not receive a response (no confirmation).

UDP Protocol ==

If the socket receives a message containing whitespace and the string +“ping”+, it will ignore it.

The socket assumes all other messages will contain a single, complete, JSON hash. The hash must be a valid JSON check result. Deserialization failures will be logged at the ERROR level by the Sensu client, but the sender of the invalid data will not be notified.

TCP Protocol ==

If the socket receives a message containing whitespace and the string +“ping”+, it will respond with the message +“pong”+.

The socket assumes any other stream will be a single, complete, JSON hash. A deserialization failure will be logged at the WARN level by the Sensu client and respond with the message +“invalid”+. An +“ok”+ response indicates the Sensu client successfully received the JSON hash and will publish the check result.

Streams can be of any length. The socket protocol does not require any headers, instead the socket tries to parse everything it has been sent each time a chunk of data arrives. Once the JSON parses successfully, the Sensu client publishes the result. After WATCHDOG_DELAY (default is 500 msec) since the most recent chunk of data was received, the agent will give up on the sender, and instead respond +“invalid”+ and close the connection.

Constants

MODE_ACCEPT

ACCEPT mode. Append chunks of data to a buffer and test to see whether the buffer contents are valid JSON.

MODE_REJECT

REJECT mode. No longer receiving data from sender. Discard chunks of data in this mode, the connection is being closed.

PING_REQUEST

PING request string, identifying a connection ping request.

WATCHDOG_DELAY

The number of seconds that may elapse between chunks of data from a sender before it is considered dead, and the connection is close.

Attributes

logger[RW]
protocol[RW]
settings[RW]
transport[RW]

Public Instance Methods

cancel_watchdog() click to toggle source

Cancel the current connection watchdog.

# File lib/sensu/client/socket.rb, line 99
def cancel_watchdog
  if @watchdog
    @watchdog.cancel
  end
end
parse_check_result(data) click to toggle source

Parse a JSON check result. For UDP, immediately raise a parser error. For TCP, record parser errors, so the connection watchdog can report them.

@param [String] data to parse for a check result.

# File lib/sensu/client/socket.rb, line 181
def parse_check_result(data)
  begin
    check = Sensu::JSON.load(data)
    cancel_watchdog
    process_check_result(check)
  rescue Sensu::JSON::ParseError, ArgumentError => error
    if @protocol == :tcp
      @parse_error = error.to_s
    else
      raise error
    end
  end
end
post_init() click to toggle source

Initialize instance variables that will be used throughout the lifetime of the connection. This method is called when the network connection has been established, and immediately after responding to a sender.

# File lib/sensu/client/socket.rb, line 78
def post_init
  @protocol ||= :tcp
  @data_buffer = ""
  @parse_error = nil
  @watchdog = nil
  @mode = MODE_ACCEPT
end
process_check_result(check) click to toggle source

Process a check result. Set check result attribute defaults, validate the attributes, publish the check result to the Sensu transport, and respond to the sender with the message +“ok”+.

@param [Hash] check result to be validated and published. @raise [DataError] if check is invalid.

# File lib/sensu/client/socket.rb, line 168
def process_check_result(check)
  check[:status] ||= 0
  check[:executed] ||= Time.now.to_i
  validate_check_result(check)
  publish_check_result(check)
  respond("ok")
end
process_data(data) click to toggle source

Process the data received. This method validates the data encoding, provides ping/pong functionality, and passes potential check results on for further processing.

@param [String] data to be processed.

# File lib/sensu/client/socket.rb, line 200
def process_data(data)
  if data.strip == PING_REQUEST
    @logger.debug("socket received ping")
    respond("pong")
  else
    @logger.debug("socket received data", :data => data)
    unless valid_utf8?(data)
      @logger.warn("data from socket is not a valid UTF-8 sequence, processing it anyways", :data => data)
    end
    begin
      parse_check_result(data)
    rescue => error
      @logger.error("failed to process check result from socket", {
        :data => data,
        :error => error.to_s
      })
      respond("invalid")
    end
  end
end
publish_check_result(check) click to toggle source

Publish a check result to the Sensu transport.

@param [Hash] check result.

# File lib/sensu/client/socket.rb, line 145
def publish_check_result(check)
  payload = {
    :client => @settings[:client][:name],
    :check => check.merge(:issued => Time.now.to_i)
  }
  payload[:signature] = @settings[:client][:signature] if @settings[:client][:signature]
  @logger.info("publishing check result", :payload => payload)
  @transport.publish(:direct, "results", Sensu::JSON.dump(payload)) do |info|
    if info[:error]
      @logger.error("failed to publish check result", {
        :payload => payload,
        :error => info[:error].to_s
      })
    end
  end
end
receive_data(data) click to toggle source

This method is called whenever data is received. For UDP, it will only be called once, the original data length can be expected. For TCP, this method may be called several times, data received is buffered. TCP connections require a watchdog.

@param [String] data received from the sender.

# File lib/sensu/client/socket.rb, line 241
def receive_data(data)
  unless @mode == MODE_REJECT
    case @protocol
    when :udp
      process_data(data)
    when :tcp
      if EM.reactor_running?
        reset_watchdog
      end
      @data_buffer << data
      process_data(@data_buffer)
    end
  end
end
reset_watchdog() click to toggle source

Reset (or start) the connection watchdog.

# File lib/sensu/client/socket.rb, line 106
def reset_watchdog
  cancel_watchdog
  @watchdog = EM::Timer.new(WATCHDOG_DELAY) do
    @mode = MODE_REJECT
    @logger.warn("discarding data buffer for sender and closing connection", {
      :data => @data_buffer,
      :parse_error => @parse_error
    })
    respond("invalid")
  end
end
respond(data) click to toggle source

Send a response to the sender, close the connection, and call #post_init().

@param [String] data to send as a response.

# File lib/sensu/client/socket.rb, line 90
def respond(data)
  if @protocol == :tcp
    send_data(data)
    close_connection_after_writing
  end
  post_init
end
valid_utf8?(data) click to toggle source

Tests if the argument (data) is a valid UTF-8 sequence.

@param [String] data to be tested.

# File lib/sensu/client/socket.rb, line 224
def valid_utf8?(data)
  utf8_string_pattern = /\A([\x00-\x7f]|
                            [\xc2-\xdf][\x80-\xbf]|
                            \xe0[\xa0-\xbf][\x80-\xbf]|
                            [\xe1-\xef][\x80-\xbf]{2}|
                            \xf0[\x90-\xbf][\x80-\xbf]{2}|
                            [\xf1-\xf7][\x80-\xbf]{3})*\z/x
  data = data.force_encoding('BINARY') if data.respond_to?(:force_encoding)
  return data =~ utf8_string_pattern
end
validate_check_result(check) click to toggle source

Validate check result attributes.

@param [Hash] check result to validate.

# File lib/sensu/client/socket.rb, line 121
def validate_check_result(check)
  unless check[:name] =~ /\A[\w\.-]+\z/
    raise DataError, "check name must be a string and cannot contain spaces or special characters"
  end
  unless check[:source].nil? || check[:source] =~ /\A[\w\.-]+\z/
    raise DataError, "check source must be a string and cannot contain spaces or special characters"
  end
  unless check[:output].is_a?(String)
    raise DataError, "check output must be a string"
  end
  unless check[:status].is_a?(Integer)
    raise DataError, "check status must be an integer"
  end
  unless check[:executed].is_a?(Integer)
    raise DataError, "check executed timestamp must be an integer"
  end
  unless check[:ttl].nil? || (check[:ttl].is_a?(Integer) && check[:ttl] > 0)
    raise DataError, "check ttl must be an integer greater than 0"
  end
end