TODO: use %i() after drop ruby v1.9.3 support.
get nexted plugins (such as <store> of the copy plugin) from the plugin `pe` recursively
# File lib/fluent/plugin/in_monitor_agent.rb, line 342 def self.collect_children(pe, array=[]) array << pe if pe.is_a?(MultiOutput) && pe.respond_to?(:outputs) pe.outputs.each {|nop| collect_children(nop, array) } end array end
# File lib/fluent/plugin/in_monitor_agent.rb, line 317 def all_plugins array = [] # get all input plugins array.concat Engine.root_agent.inputs # get all output plugins Engine.root_agent.outputs.each { |o| MonitorAgentInput.collect_children(o, array) } # get all filter plugins Engine.root_agent.filters.each { |f| MonitorAgentInput.collect_children(f, array) } Engine.root_agent.labels.each { |name, l| # TODO: Add label name to outputs / filters for identifing plugins l.outputs.each { |o| MonitorAgentInput.collect_children(o, array) } l.filters.each { |f| MonitorAgentInput.collect_children(f, array) } } array end
# File lib/fluent/plugin/in_monitor_agent.rb, line 467 def fluentd_opts @fluentd_opts ||= get_fluentd_opts end
# File lib/fluent/plugin/in_monitor_agent.rb, line 471 def get_fluentd_opts opts = {} ObjectSpace.each_object(Fluent::Supervisor) { |obj| opts.merge!(obj.options) break } opts end
get monitor info from the plugin `pe` and return a hash object
# File lib/fluent/plugin/in_monitor_agent.rb, line 400 def get_monitor_info(pe, opts={}) obj = {} # Common plugin information obj['plugin_id'] = pe.plugin_id obj['plugin_category'] = plugin_category(pe) obj['type'] = pe.config['@type'] || pe.config['type'] || Fluent::Plugin.lookup_name_from_class(pe.class) obj['config'] = pe.config if opts[:with_config] # run MONITOR_INFO in plugins' instance context and store the info to obj MONITOR_INFO.each_pair {|key,code| begin obj[key] = pe.instance_eval(code) rescue end } if opts[:with_retry] num_errors = pe.instance_variable_get(:@num_errors) if num_errors obj['retry'] = num_errors.zero? ? EMPTY_RESULT : get_retry_info(pe, num_errors) end end # include all instance variables if :with_debug_info is set if opts[:with_debug_info] iv = {} pe.instance_eval do instance_variables.each {|sym| next if IGNORE_ATTRIBUTES.include?(sym) key = sym.to_s[1..-1] # removes first '@' iv[key] = instance_variable_get(sym) } end obj['instance_variables'] = iv elsif ivars = opts[:ivars] iv = {} ivars.each {|name| iname = "@#{name}" iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname) } obj['instance_variables'] = iv end obj end
# File lib/fluent/plugin/in_monitor_agent.rb, line 447 def get_retry_info(pe, num_errors) retry_variables = {} retry_variables['steps'] = num_errors retry_variables['next_time'] = Time.at(pe.instance_variable_get('@next_retry_time'.freeze)) retry_variables end
# File lib/fluent/plugin/in_monitor_agent.rb, line 454 def plugin_category(pe) case pe when Fluent::Input 'input'.freeze when Fluent::Output 'output'.freeze when Fluent::Filter 'filter'.freeze else 'unknown'.freeze end end
search a plugin by plugin_id
# File lib/fluent/plugin/in_monitor_agent.rb, line 367 def plugin_info_by_id(plugin_id, opts={}) found = all_plugins.find {|pe| pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id } if found get_monitor_info(found, opts) else nil end end
try to match the tag and get the info from the matched output plugin TODO: Support output in label
# File lib/fluent/plugin/in_monitor_agent.rb, line 354 def plugin_info_by_tag(tag, opts={}) matches = Engine.root_agent.event_router.instance_variable_get(:@match_rules) matches.each { |rule| if rule.match?(tag) if rule.collector.is_a?(Output) return get_monitor_info(rule.collector, opts) end end } nil end
# File lib/fluent/plugin/in_monitor_agent.rb, line 389 def plugins_info_all(opts={}) all_plugins.map {|pe| get_monitor_info(pe, opts) } end
This method returns an array because multiple plugins could have the same type
# File lib/fluent/plugin/in_monitor_agent.rb, line 380 def plugins_info_by_type(type, opts={}) array = all_plugins.select {|pe| (pe.config['@type'] == type || pe.config['type'] == type || Fluent::Plugin.lookup_name_from_class(pe.class)) rescue nil } array.map {|pe| get_monitor_info(pe, opts) } end
# File lib/fluent/plugin/in_monitor_agent.rb, line 285 def run @loop.run rescue => e log.error "unexpected error", error: e.to_s log.error_backtrace end
# File lib/fluent/plugin/in_monitor_agent.rb, line 292 def shutdown if @srv @srv.shutdown @srv = nil end if @thread @thread.join @thread = nil end if @tag @loop.watchers.each { |w| w.detach } @loop.stop @loop = nil @thread_for_emit.join @thread_for_emit = nil end end
# File lib/fluent/plugin/in_monitor_agent.rb, line 252 def start log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" @srv = WEBrick::HTTPServer.new({ BindAddress: @bind, Port: @port, Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [], }) @srv.mount('/api/plugins', LTSVMonitorServlet, self) @srv.mount('/api/plugins.json', JSONMonitorServlet, self) @srv.mount('/api/config', LTSVConfigMonitorServlet, self) @srv.mount('/api/config.json', JSONConfigMonitorServlet, self) @thread = Thread.new { @srv.start } if @tag log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'" @loop = Coolio::Loop.new opts = {with_config: @emit_config, with_retry: false} timer = TimerWatcher.new(@emit_interval, log) { es = MultiEventStream.new now = Engine.now plugins_info_all(opts).each { |record| es.add(now, record) } router.emit_stream(@tag, es) } @loop.attach(timer) @thread_for_emit = Thread.new(&method(:run)) end end