Skip to content

Commit

Permalink
Merge pull request #4 from sonots/fix_v14
Browse files Browse the repository at this point in the history
Fix tests to pass with v0.14
  • Loading branch information
sonots committed Sep 15, 2017
2 parents b1fe08a + 00b427f commit 6f2b98c
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 122 deletions.
15 changes: 3 additions & 12 deletions lib/fluent/plugin/filter_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
# Be lazy to to implement filter plugin, use output plugin instance
require_relative 'out_flowcounter_simple'
require 'forwardable'
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleFilter < Fluent::Filter
Fluent::Plugin.register_filter('flowcounter_simple', self)

extend Forwardable
attr_reader :output
def_delegators :@output, :configure, :start, :shutdown, :flush_emit

def initialize
super
@output = Fluent::FlowCounterSimpleOutput.new
end
include ::Fluent::FlowcounterSimple

def filter_stream(tag, es)
@output.emit(tag, es, Fluent::NullOutputChain.instance)
process(tag, es)
es
end
end if defined?(Fluent::Filter)
101 changes: 101 additions & 0 deletions lib/fluent/plugin/flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
module Fluent
module FlowcounterSimple
attr_accessor :last_checked

def self.included(klass)
klass.config_param :indicator, :string, :default => 'num'
klass.config_param :unit, :string, :default => 'second'
klass.config_param :comment, :string, :default => nil
end

def configure(conf)
super

@indicator_proc =
case @indicator
when 'num' then Proc.new {|record| 1 }
when 'byte' then Proc.new {|record| record.to_msgpack.size }
else
raise Fluent::ConfigError, "flowcounter-simple count allows num/byte"
end
@unit =
case @unit
when 'second' then :second
when 'minute' then :minute
when 'hour' then :hour
when 'day' then :day
else
raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day"
end
@tick =
case @unit
when :second then 1
when :minute then 60
when :hour then 3600
when :day then 86400
else
raise RuntimeError, "@unit must be one of second/minute/hour/day"
end

@output_proc =
if @comment
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
else
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
end

@count = 0
@mutex = Mutex.new
end

def start
super
start_watch
end

def shutdown
super
@watcher.terminate
@watcher.join
end

def countup(count)
@mutex.synchronize {
@count = (@count || 0) + count
}
end

def flush_emit(step)
count, @count = @count, 0
if count > 0
log.info @output_proc.call(count)
end
end

def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
end

def watch
# instance variable, and public accessable, for test
@last_checked = Fluent::Engine.now
while true
sleep 0.1
if Fluent::Engine.now - @last_checked >= @tick
now = Fluent::Engine.now
flush_emit(now - @last_checked)
@last_checked = now
end
end
end

def process(tag, es)
count = 0
es.each {|time,record|
count += @indicator_proc.call(record)
}
countup(count)
end
end
end
97 changes: 4 additions & 93 deletions lib/fluent/plugin/out_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleOutput < Fluent::Output
Fluent::Plugin.register_output('flowcounter_simple', self)

Expand All @@ -6,101 +8,10 @@ class Fluent::FlowCounterSimpleOutput < Fluent::Output
define_method("log") { $log }
end

config_param :indicator, :string, :default => 'num'
config_param :unit, :string, :default => 'second'
config_param :comment, :string, :default => nil

attr_accessor :last_checked

def configure(conf)
super

@indicator_proc =
case @indicator
when 'num' then Proc.new {|record| 1 }
when 'byte' then Proc.new {|record| record.to_msgpack.size }
else
raise Fluent::ConfigError, "flowcounter-simple count allows num/byte"
end
@unit =
case @unit
when 'second' then :second
when 'minute' then :minute
when 'hour' then :hour
when 'day' then :day
else
raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day"
end
@tick =
case @unit
when :second then 1
when :minute then 60
when :hour then 3600
when :day then 86400
else
raise RuntimeError, "@unit must be one of second/minute/hour/day"
end

@output_proc =
if @comment
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
else
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
end

@count = 0
@mutex = Mutex.new
end

def start
super
start_watch
end

def shutdown
super
@watcher.terminate
@watcher.join
end

def countup(count)
@mutex.synchronize {
@count = (@count || 0) + count
}
end

def flush_emit(step)
count, @count = @count, 0
if count > 0
log.info @output_proc.call(count)
end
end

def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
end

def watch
# instance variable, and public accessable, for test
@last_checked = Fluent::Engine.now
while true
sleep 0.1
if Fluent::Engine.now - @last_checked >= @tick
now = Fluent::Engine.now
flush_emit(now - @last_checked)
@last_checked = now
end
end
end
include ::Fluent::FlowcounterSimple

def emit(tag, es, chain)
count = 0
es.each {|time,record|
count += @indicator_proc.call(record)
}
countup(count)

process(tag, es)
chain.next
end
end
19 changes: 13 additions & 6 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

class Test::Unit::TestCase
def capture_log(log)
tmp = log.out
log.out = StringIO.new
yield
return log.out.string
ensure
log.out = tmp
if defined?(Fluent::Test::TestLogger) and log.is_a?(Fluent::Test::TestLogger) # v0.14
yield
log.out.logs.join("\n")
else
begin
tmp = log.out
log.out = StringIO.new
yield
return log.out.string
ensure
log.out = tmp
end
end
end
end
8 changes: 4 additions & 4 deletions test/plugin/test_filter_flowcounter_simple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ def test_filter
d = create_driver
filtered, out = filter(d, msgs)
assert_equal msgs, filtered
assert( out.include?("count:20"), out )
assert { out.include?("count:20") }
end

private

def filter(d, msgs)
stub(d.instance.output).start
stub(d.instance.output).shutdown
stub(d.instance).start
stub(d.instance).shutdown
d.run {
msgs.each {|msg|
d.filter(msg, @time)
}
}
out = capture_log(d.instance.output.log) do
out = capture_log(d.instance.log) do
d.instance.flush_emit(0)
end
filtered = d.filtered_as_array
Expand Down
14 changes: 7 additions & 7 deletions test/plugin/test_out_flowcounter_simple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ def create_driver(conf=CONFIG,tag='test')

def test_configure
assert_nothing_raised {
d = create_driver('')
create_driver('')
}
assert_nothing_raised {
d = create_driver(CONFIG)
create_driver(CONFIG)
}
assert_nothing_raised {
d = create_driver(CONFIG + %[indicator num])
create_driver(CONFIG + %[indicator num])
}
assert_nothing_raised {
d = create_driver(CONFIG + %[indicator byte])
create_driver(CONFIG + %[indicator byte])
}
end

Expand All @@ -38,7 +38,7 @@ def test_num
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
assert( out.include?("count:30"), out )
assert { out.include?("count:30") }
end

def test_byte
Expand All @@ -51,7 +51,7 @@ def test_byte
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
assert( out =~ /count:\d+\tindicator:byte\tunit:second/, out )
assert { out =~ /count:\d+\tindicator:byte\tunit:second/ }
end

def test_comment
Expand All @@ -64,6 +64,6 @@ def test_comment
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
assert( out.include?("comment:foobar"), out )
assert { out.include?("comment:foobar") }
end
end

0 comments on commit 6f2b98c

Please sign in to comment.