class Fluent::EngineClass

Constants

LOG_EMIT_INTERVAL
MATCH_CACHE_SIZE

Attributes

matches[R]
msgpack_factory[R]
root_agent[R]
sources[R]
system_config[R]

Public Class Methods

new() click to toggle source
# File lib/fluent/engine.rb, line 42
def initialize
  @root_agent = nil
  @event_router = nil
  @default_loop = nil
  @engine_stopped = false

  @log_emit_thread = nil
  @log_event_loop_stop = false
  @log_event_queue = []

  @suppress_config_dump = false

  @msgpack_factory = DummyMessagePackFactory.new
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/engine.rb, line 123
def configure(conf)
  # plugins / configuration dumps
  Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
    $log.info "gem '#{spec.name}' version '#{spec.version}'"
  end

  @root_agent.configure(conf)
  @event_router = @root_agent.event_router

  unless @suppress_config_dump
    $log.info "using configuration file: #{conf.to_s.rstrip}"
  end
end
emit(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 141
def emit(tag, time, record)
  unless record.nil?
    emit_stream tag, OneEventStream.new(time, record)
  end
end
emit_array(tag, array) click to toggle source
# File lib/fluent/engine.rb, line 147
def emit_array(tag, array)
  emit_stream tag, ArrayEventStream.new(array)
end
emit_stream(tag, es) click to toggle source
# File lib/fluent/engine.rb, line 151
def emit_stream(tag, es)
  @event_router.emit_stream(tag, es)
end
flush!() click to toggle source
# File lib/fluent/engine.rb, line 155
def flush!
  @root_agent.flush!
end
init(system_config) click to toggle source
# File lib/fluent/engine.rb, line 65
def init(system_config)
  @system_config = system_config

  BasicSocket.do_not_reverse_lookup = true
  Plugin.load_plugins
  if defined?(Encoding)
    Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
    Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
  end

  suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
  @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
  @without_source = system_config.without_source unless system_config.without_source.nil?

  @root_agent = RootAgent.new(@system_config)

  self
end
load_plugin_dir(dir) click to toggle source
# File lib/fluent/engine.rb, line 137
def load_plugin_dir(dir)
  Plugin.load_plugin_dir(dir)
end
log() click to toggle source
# File lib/fluent/engine.rb, line 84
def log
  $log
end
log_event_loop() click to toggle source
# File lib/fluent/engine.rb, line 164
def log_event_loop
  $log.disable_events(Thread.current)

  while sleep(LOG_EMIT_INTERVAL)
    break if @log_event_loop_stop
    next if @log_event_queue.empty?

    # NOTE: thead-safe of slice! depends on GVL
    events = @log_event_queue.slice!(0..-1)
    next if events.empty?

    events.each {|tag,time,record|
      begin
        @event_router.emit(tag, time, record)
      rescue => e
        $log.error "failed to emit fluentd's log event", tag: tag, event: record, error_class: e.class, error: e
      end
    }
  end
end
now() click to toggle source
# File lib/fluent/engine.rb, line 159
def now
  # TODO thread update
  Time.now.to_i
end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false) click to toggle source
# File lib/fluent/engine.rb, line 93
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
  if fname =~ /\.rb$/
    require 'fluent/config/dsl'
    Config::DSL::Parser.parse(io, File.join(basepath, fname))
  else
    Config.parse(io, fname, basepath, v1_config)
  end
end
push_log_event(tag, time, record) click to toggle source
# File lib/fluent/engine.rb, line 229
def push_log_event(tag, time, record)
  return if @log_emit_thread.nil?
  @log_event_queue.push([tag, time, record])
end
run() click to toggle source
# File lib/fluent/engine.rb, line 185
def run
  begin
    start

    if @event_router.match?($log.tag)
      $log.enable_event
      @log_emit_thread = Thread.new(&method(:log_event_loop))
    end

    unless @engine_stopped
      # for empty loop
      @default_loop = Coolio::Loop.default
      @default_loop.attach Coolio::TimerWatcher.new(1, true)
      # TODO attach async watch for thread pool
      @default_loop.run
    end

    if @engine_stopped and @default_loop
      @default_loop.stop
      @default_loop = nil
    end

  rescue => e
    $log.error "unexpected error", error_class: e.class, error: e
    $log.error_backtrace
  ensure
    $log.info "shutting down fluentd"
    shutdown
    if @log_emit_thread
      @log_event_loop_stop = true
      @log_emit_thread.join
    end
  end
end
run_configure(conf) click to toggle source
# File lib/fluent/engine.rb, line 102
def run_configure(conf)
  configure(conf)
  conf.check_not_fetched { |key, e|
    parent_name, plugin_name = e.unused_in
    if parent_name
      message = if plugin_name
                  "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
                else
                  "section <#{e.name}> is not used in <#{parent_name}>"
                end
      $log.warn message
      next
    end
    unless e.name == 'system'
      unless @without_source && e.name == 'source'
        $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
      end
    end
  }
end
stop() click to toggle source
# File lib/fluent/engine.rb, line 220
def stop
  @engine_stopped = true
  if @default_loop
    @default_loop.stop
    @default_loop = nil
  end
  nil
end
suppress_interval(interval_time) click to toggle source
# File lib/fluent/engine.rb, line 88
def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end

Private Instance Methods

shutdown() click to toggle source
# File lib/fluent/engine.rb, line 240
def shutdown
  @root_agent.shutdown
end
start() click to toggle source
# File lib/fluent/engine.rb, line 236
def start
  @root_agent.start
end