diff --git a/lib/fluent/plugin/filter_flowcounter_simple.rb b/lib/fluent/plugin/filter_flowcounter_simple.rb index 4230d2a..6df7bb8 100644 --- a/lib/fluent/plugin/filter_flowcounter_simple.rb +++ b/lib/fluent/plugin/filter_flowcounter_simple.rb @@ -1,21 +1,12 @@ -# Be lazy to to implement filter plugin, use output plugin instance -require_relative 'out_flowcounter_simple' -require 'forwardable' +require_relative 'flowcounter_simple' class Fluent::FlowCounterSimpleFilter < Fluent::Filter Fluent::Plugin.register_filter('flowcounter_simple', self) - extend Forwardable - attr_reader :output - def_delegators :@output, :configure, :start, :shutdown, :flush_emit - - def initialize - super - @output = Fluent::FlowCounterSimpleOutput.new - end + include ::Fluent::FlowcounterSimple def filter_stream(tag, es) - @output.emit(tag, es, Fluent::NullOutputChain.instance) + process(tag, es) es end end if defined?(Fluent::Filter) diff --git a/lib/fluent/plugin/flowcounter_simple.rb b/lib/fluent/plugin/flowcounter_simple.rb new file mode 100644 index 0000000..026dc62 --- /dev/null +++ b/lib/fluent/plugin/flowcounter_simple.rb @@ -0,0 +1,101 @@ +module Fluent + module FlowcounterSimple + attr_accessor :last_checked + + def self.included(klass) + klass.config_param :indicator, :string, :default => 'num' + klass.config_param :unit, :string, :default => 'second' + klass.config_param :comment, :string, :default => nil + end + + def configure(conf) + super + + @indicator_proc = + case @indicator + when 'num' then Proc.new {|record| 1 } + when 'byte' then Proc.new {|record| record.to_msgpack.size } + else + raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" + end + @unit = + case @unit + when 'second' then :second + when 'minute' then :minute + when 'hour' then :hour + when 'day' then :day + else + raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" + end + @tick = + case @unit + when :second then 1 + when :minute then 60 + when :hour then 3600 + when :day then 86400 + else + raise RuntimeError, "@unit must be one of second/minute/hour/day" + end + + @output_proc = + if @comment + Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } + else + Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } + end + + @count = 0 + @mutex = Mutex.new + end + + def start + super + start_watch + end + + def shutdown + super + @watcher.terminate + @watcher.join + end + + def countup(count) + @mutex.synchronize { + @count = (@count || 0) + count + } + end + + def flush_emit(step) + count, @count = @count, 0 + if count > 0 + log.info @output_proc.call(count) + end + end + + def start_watch + # for internal, or tests only + @watcher = Thread.new(&method(:watch)) + end + + def watch + # instance variable, and public accessable, for test + @last_checked = Fluent::Engine.now + while true + sleep 0.1 + if Fluent::Engine.now - @last_checked >= @tick + now = Fluent::Engine.now + flush_emit(now - @last_checked) + @last_checked = now + end + end + end + + def process(tag, es) + count = 0 + es.each {|time,record| + count += @indicator_proc.call(record) + } + countup(count) + end + end +end diff --git a/lib/fluent/plugin/out_flowcounter_simple.rb b/lib/fluent/plugin/out_flowcounter_simple.rb index 63a112f..c741e75 100644 --- a/lib/fluent/plugin/out_flowcounter_simple.rb +++ b/lib/fluent/plugin/out_flowcounter_simple.rb @@ -1,3 +1,5 @@ +require_relative 'flowcounter_simple' + class Fluent::FlowCounterSimpleOutput < Fluent::Output Fluent::Plugin.register_output('flowcounter_simple', self) @@ -6,101 +8,10 @@ class Fluent::FlowCounterSimpleOutput < Fluent::Output define_method("log") { $log } end - config_param :indicator, :string, :default => 'num' - config_param :unit, :string, :default => 'second' - config_param :comment, :string, :default => nil - - attr_accessor :last_checked - - def configure(conf) - super - - @indicator_proc = - case @indicator - when 'num' then Proc.new {|record| 1 } - when 'byte' then Proc.new {|record| record.to_msgpack.size } - else - raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" - end - @unit = - case @unit - when 'second' then :second - when 'minute' then :minute - when 'hour' then :hour - when 'day' then :day - else - raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" - end - @tick = - case @unit - when :second then 1 - when :minute then 60 - when :hour then 3600 - when :day then 86400 - else - raise RuntimeError, "@unit must be one of second/minute/hour/day" - end - - @output_proc = - if @comment - Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } - else - Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } - end - - @count = 0 - @mutex = Mutex.new - end - - def start - super - start_watch - end - - def shutdown - super - @watcher.terminate - @watcher.join - end - - def countup(count) - @mutex.synchronize { - @count = (@count || 0) + count - } - end - - def flush_emit(step) - count, @count = @count, 0 - if count > 0 - log.info @output_proc.call(count) - end - end - - def start_watch - # for internal, or tests only - @watcher = Thread.new(&method(:watch)) - end - - def watch - # instance variable, and public accessable, for test - @last_checked = Fluent::Engine.now - while true - sleep 0.1 - if Fluent::Engine.now - @last_checked >= @tick - now = Fluent::Engine.now - flush_emit(now - @last_checked) - @last_checked = now - end - end - end + include ::Fluent::FlowcounterSimple def emit(tag, es, chain) - count = 0 - es.each {|time,record| - count += @indicator_proc.call(record) - } - countup(count) - + process(tag, es) chain.next end end diff --git a/test/helper.rb b/test/helper.rb index 0b75b4a..cd2449c 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -17,11 +17,18 @@ class Test::Unit::TestCase def capture_log(log) - tmp = log.out - log.out = StringIO.new - yield - return log.out.string - ensure - log.out = tmp + if defined?(Fluent::Test::TestLogger) and log.is_a?(Fluent::Test::TestLogger) # v0.14 + yield + log.out.logs.join("\n") + else + begin + tmp = log.out + log.out = StringIO.new + yield + return log.out.string + ensure + log.out = tmp + end + end end end diff --git a/test/plugin/test_filter_flowcounter_simple.rb b/test/plugin/test_filter_flowcounter_simple.rb index 609090f..0ab6942 100644 --- a/test/plugin/test_filter_flowcounter_simple.rb +++ b/test/plugin/test_filter_flowcounter_simple.rb @@ -26,20 +26,20 @@ def test_filter d = create_driver filtered, out = filter(d, msgs) assert_equal msgs, filtered - assert( out.include?("count:20"), out ) + assert { out.include?("count:20") } end private def filter(d, msgs) - stub(d.instance.output).start - stub(d.instance.output).shutdown + stub(d.instance).start + stub(d.instance).shutdown d.run { msgs.each {|msg| d.filter(msg, @time) } } - out = capture_log(d.instance.output.log) do + out = capture_log(d.instance.log) do d.instance.flush_emit(0) end filtered = d.filtered_as_array diff --git a/test/plugin/test_out_flowcounter_simple.rb b/test/plugin/test_out_flowcounter_simple.rb index 17b680b..07a158e 100644 --- a/test/plugin/test_out_flowcounter_simple.rb +++ b/test/plugin/test_out_flowcounter_simple.rb @@ -15,16 +15,16 @@ def create_driver(conf=CONFIG,tag='test') def test_configure assert_nothing_raised { - d = create_driver('') + create_driver('') } assert_nothing_raised { - d = create_driver(CONFIG) + create_driver(CONFIG) } assert_nothing_raised { - d = create_driver(CONFIG + %[indicator num]) + create_driver(CONFIG + %[indicator num]) } assert_nothing_raised { - d = create_driver(CONFIG + %[indicator byte]) + create_driver(CONFIG + %[indicator byte]) } end @@ -38,7 +38,7 @@ def test_num end end out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) } - assert( out.include?("count:30"), out ) + assert { out.include?("count:30") } end def test_byte @@ -51,7 +51,7 @@ def test_byte end end out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) } - assert( out =~ /count:\d+\tindicator:byte\tunit:second/, out ) + assert { out =~ /count:\d+\tindicator:byte\tunit:second/ } end def test_comment @@ -64,6 +64,6 @@ def test_comment end end out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) } - assert( out.include?("comment:foobar"), out ) + assert { out.include?("comment:foobar") } end end