Skip to content

Commit

Permalink
Use v1 plugin API
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Jun 30, 2020
1 parent 6f2b98c commit f4fefd0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 40 deletions.
17 changes: 10 additions & 7 deletions lib/fluent/plugin/filter_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
require 'fluent/plugin/filter'
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleFilter < Fluent::Filter
Fluent::Plugin.register_filter('flowcounter_simple', self)
module Fluent::Plugin
class FlowCounterSimpleFilter < Filter
Fluent::Plugin.register_filter('flowcounter_simple', self)

include ::Fluent::FlowcounterSimple
include ::Fluent::FlowcounterSimple

def filter_stream(tag, es)
process(tag, es)
es
def filter_stream(tag, es)
process_count(tag, es)
es
end
end
end if defined?(Fluent::Filter)
end
45 changes: 22 additions & 23 deletions lib/fluent/plugin/flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
require 'fluent/plugin_helper/thread'

module Fluent
module FlowcounterSimple
attr_accessor :last_checked

def self.included(klass)
klass.helpers :thread
klass.config_param :indicator, :string, :default => 'num'
klass.config_param :unit, :string, :default => 'second'
klass.config_param :comment, :string, :default => nil
Expand All @@ -13,8 +16,14 @@ def configure(conf)

@indicator_proc =
case @indicator
when 'num' then Proc.new {|record| 1 }
when 'byte' then Proc.new {|record| record.to_msgpack.size }
when 'num' then Proc.new { |es| es.size }
when 'byte' then Proc.new { |es|
count = 0
es.each { |time, record|
count += record.to_msgpack.size
}
count
}
else
raise Fluent::ConfigError, "flowcounter-simple count allows num/byte"
end
Expand All @@ -34,14 +43,15 @@ def configure(conf)
when :hour then 3600
when :day then 86400
else
raise RuntimeError, "@unit must be one of second/minute/hour/day"
raise Fluent::ConfigError, "@unit must be one of second/minute/hour/day"
end

@type_str = self.is_a?(Fluent::Plugin::Filter) ? 'filter' : 'out'
@output_proc =
if @comment
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
else
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
end

@count = 0
Expand All @@ -50,13 +60,11 @@ def configure(conf)

def start
super
start_watch
thread_create(:flowcounter_simple_watch, &method(:watch))
end

def shutdown
super
@watcher.terminate
@watcher.join
end

def countup(count)
Expand All @@ -72,30 +80,21 @@ def flush_emit(step)
end
end

def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
end

def watch
# instance variable, and public accessable, for test
@last_checked = Fluent::Engine.now
while true
@last_checked = Fluent::EventTime.now
while thread_current_running?
sleep 0.1
if Fluent::Engine.now - @last_checked >= @tick
now = Fluent::Engine.now
if Fluent::EventTime.now - @last_checked >= @tick
now = Fluent::EventTime.now
flush_emit(now - @last_checked)
@last_checked = now
end
end
end

def process(tag, es)
count = 0
es.each {|time,record|
count += @indicator_proc.call(record)
}
countup(count)
def process_count(tag, es)
countup(@indicator_proc.call(es))
end
end
end
25 changes: 15 additions & 10 deletions lib/fluent/plugin/out_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
require 'fluent/plugin/output'
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleOutput < Fluent::Output
Fluent::Plugin.register_output('flowcounter_simple', self)
module Fluent::Plugin
class FlowCounterSimpleOutput < Output
Fluent::Plugin.register_output('flowcounter_simple', self)

# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
include ::Fluent::FlowcounterSimple

def prefer_buffered_processing
false
end

include ::Fluent::FlowcounterSimple
def multi_workers_ready?
true
end

def emit(tag, es, chain)
process(tag, es)
chain.next
def process(tag, es)
process_count(tag, es)
end
end
end

0 comments on commit f4fefd0

Please sign in to comment.