Skip to content

Commit

Permalink
Merge pull request #5 from repeatedly/use-v1-api
Browse files Browse the repository at this point in the history
Use v1 api
  • Loading branch information
sonots committed Jun 30, 2020
2 parents 6f2b98c + e4d1b64 commit 2d6ad44
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 100 deletions.
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
rvm:
- 2.1.*
- 2.2.*
- 2.3.*
- 2.4.*
- 2.5.*
- 2.6.*
- 2.7.*

gemfile:
- Gemfile
- Gemfile.v0.10
- Gemfile.v0.12

before_install:
- gem update bundler
4 changes: 0 additions & 4 deletions Gemfile.v0.10

This file was deleted.

4 changes: 0 additions & 4 deletions Gemfile.v0.12

This file was deleted.

27 changes: 16 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@

Simple Fluentd Plugin to count number of messages and outputs to log

## Output Plugin Configuration
## Requirements

<match foo.bar.**>
type flowcounter_simple
unit second
</match>

This plugin does not emit, just writes counts into the log file as

plugin:out_flowcounter_simple count:30 indicator:num unit:second
| fluent-plugin-flowcounter-simple | fluentd | ruby |
|-------------------|---------|------|
| >= 0.1.0 | >= v1.0 | >= 2.4 |
| < 0.0.4 | >= v0.12.0 | >= 2.1 |

## Filter Plugin Configuration

Fluentd >= v0.12

```apache
<filter foo.bar.**>
type flowcounter_simple
Expand All @@ -28,6 +22,17 @@ This filter plugin pass through records, and writes counts into the log file as

plugin:out_flowcounter_simple count:30 indicator:num unit:second

## Output Plugin Configuration

<match foo.bar.**>
type flowcounter_simple
unit second
</match>

This plugin does not emit, just writes counts into the log file as

plugin:out_flowcounter_simple count:30 indicator:num unit:second

## Parameters

- unit
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-flowcounter-simple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Gem::Specification.new do |gem|
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
gem.require_paths = ["lib"]

gem.add_runtime_dependency "fluentd"
gem.add_runtime_dependency "fluentd", [">= 1.0"]
gem.add_development_dependency "rake"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-nav"
Expand Down
17 changes: 10 additions & 7 deletions lib/fluent/plugin/filter_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
require 'fluent/plugin/filter'
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleFilter < Fluent::Filter
Fluent::Plugin.register_filter('flowcounter_simple', self)
module Fluent::Plugin
class FlowCounterSimpleFilter < Filter
Fluent::Plugin.register_filter('flowcounter_simple', self)

include ::Fluent::FlowcounterSimple
include ::Fluent::FlowcounterSimple

def filter_stream(tag, es)
process(tag, es)
es
def filter_stream(tag, es)
process_count(tag, es)
es
end
end
end if defined?(Fluent::Filter)
end
45 changes: 22 additions & 23 deletions lib/fluent/plugin/flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
require 'fluent/plugin_helper/thread'

module Fluent
module FlowcounterSimple
attr_accessor :last_checked

def self.included(klass)
klass.helpers :thread
klass.config_param :indicator, :string, :default => 'num'
klass.config_param :unit, :string, :default => 'second'
klass.config_param :comment, :string, :default => nil
Expand All @@ -13,8 +16,14 @@ def configure(conf)

@indicator_proc =
case @indicator
when 'num' then Proc.new {|record| 1 }
when 'byte' then Proc.new {|record| record.to_msgpack.size }
when 'num' then Proc.new { |es| es.size }
when 'byte' then Proc.new { |es|
count = 0
es.each { |time, record|
count += record.to_msgpack.size
}
count
}
else
raise Fluent::ConfigError, "flowcounter-simple count allows num/byte"
end
Expand All @@ -34,14 +43,15 @@ def configure(conf)
when :hour then 3600
when :day then 86400
else
raise RuntimeError, "@unit must be one of second/minute/hour/day"
raise Fluent::ConfigError, "@unit must be one of second/minute/hour/day"
end

@type_str = self.is_a?(Fluent::Plugin::Filter) ? 'filter' : 'out'
@output_proc =
if @comment
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" }
else
Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" }
end

@count = 0
Expand All @@ -50,13 +60,11 @@ def configure(conf)

def start
super
start_watch
thread_create(:flowcounter_simple_watch, &method(:watch))
end

def shutdown
super
@watcher.terminate
@watcher.join
end

def countup(count)
Expand All @@ -72,30 +80,21 @@ def flush_emit(step)
end
end

def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))
end

def watch
# instance variable, and public accessable, for test
@last_checked = Fluent::Engine.now
while true
@last_checked = Fluent::EventTime.now
while thread_current_running?
sleep 0.1
if Fluent::Engine.now - @last_checked >= @tick
now = Fluent::Engine.now
if Fluent::EventTime.now - @last_checked >= @tick
now = Fluent::EventTime.now
flush_emit(now - @last_checked)
@last_checked = now
end
end
end

def process(tag, es)
count = 0
es.each {|time,record|
count += @indicator_proc.call(record)
}
countup(count)
def process_count(tag, es)
countup(@indicator_proc.call(es))
end
end
end
25 changes: 15 additions & 10 deletions lib/fluent/plugin/out_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
require 'fluent/plugin/output'
require_relative 'flowcounter_simple'

class Fluent::FlowCounterSimpleOutput < Fluent::Output
Fluent::Plugin.register_output('flowcounter_simple', self)
module Fluent::Plugin
class FlowCounterSimpleOutput < Output
Fluent::Plugin.register_output('flowcounter_simple', self)

# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
include ::Fluent::FlowcounterSimple

def prefer_buffered_processing
false
end

include ::Fluent::FlowcounterSimple
def multi_workers_ready?
true
end

def emit(tag, es, chain)
process(tag, es)
chain.next
def process(tag, es)
process_count(tag, es)
end
end
end
12 changes: 3 additions & 9 deletions test/helper.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
require 'rubygems'
require 'bundler'
begin
Bundler.setup(:default, :development)
rescue Bundler::BundlerError => e
$stderr.puts e.message
$stderr.puts "Run `bundle install` to install missing gems"
exit e.status_code
end

require 'test/unit'
require 'test/unit/rr'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))

require 'fluent/test'
require 'fluent/plugin/out_flowcounter_simple'
require 'fluent/plugin/filter_flowcounter_simple'

class Test::Unit::TestCase
def capture_log(log)
Expand Down
17 changes: 7 additions & 10 deletions test/plugin/test_filter_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require_relative '../helper'
require "test/unit/rr"
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_flowcounter_simple'

class FlowCounterSimpleFilterTest < Test::Unit::TestCase
include Fluent
Expand All @@ -14,7 +15,7 @@ def setup
]

def create_driver(conf = CONFIG)
Fluent::Test::FilterTestDriver.new(Fluent::FlowCounterSimpleFilter).configure(conf, true)
Fluent::Test::Driver::Filter.new(Fluent::Plugin::FlowCounterSimpleFilter).configure(conf)
end

def test_filter
Expand All @@ -32,18 +33,14 @@ def test_filter
private

def filter(d, msgs)
stub(d.instance).start
stub(d.instance).shutdown
d.run {
d.run(default_tag: 'test') {
msgs.each {|msg|
d.filter(msg, @time)
d.feed(msg)
}
}
out = capture_log(d.instance.log) do
d.instance.flush_emit(0)
end
filtered = d.filtered_as_array
filtered_msgs = filtered.map {|m| m[2] }
[filtered_msgs, out]
[d.filtered_records, out]
end
end if defined?(Fluent::Filter)
end
34 changes: 18 additions & 16 deletions test/plugin/test_out_flowcounter_simple.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require_relative '../helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_flowcounter_simple'

class FlowCounterSimpleOutputTest < Test::Unit::TestCase
def setup
Expand All @@ -10,7 +12,7 @@ def setup
]

def create_driver(conf=CONFIG,tag='test')
Fluent::Test::OutputTestDriver.new(Fluent::FlowCounterSimpleOutput, tag).configure(conf)
Fluent::Test::Driver::Output.new(Fluent::Plugin::FlowCounterSimpleOutput).configure(conf)
end

def test_configure
Expand All @@ -29,38 +31,38 @@ def test_configure
end

def test_num
d1 = create_driver(CONFIG, 'test.tag1')
d1.run do
d1 = create_driver(CONFIG)
d1.run(default_tag: 'test.tag1') do
10.times do
d1.emit({'message'=> 'a' * 100})
d1.emit({'message'=> 'b' * 100})
d1.emit({'message'=> 'c' * 100})
d1.feed({'message'=> 'a' * 100})
d1.feed({'message'=> 'b' * 100})
d1.feed({'message'=> 'c' * 100})
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
assert { out.include?("count:30") }
end

def test_byte
d1 = create_driver(CONFIG + %[indicator byte], 'test.tag1')
d1.run do
d1 = create_driver(CONFIG + %[indicator byte])
d1.run(default_tag: 'test.tag1') do
10.times do
d1.emit({'message'=> 'a' * 100})
d1.emit({'message'=> 'b' * 100})
d1.emit({'message'=> 'c' * 100})
d1.feed({'message'=> 'a' * 100})
d1.feed({'message'=> 'b' * 100})
d1.feed({'message'=> 'c' * 100})
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
assert { out =~ /count:\d+\tindicator:byte\tunit:second/ }
end

def test_comment
d1 = create_driver(CONFIG + %[comment foobar], 'test.tag1')
d1.run do
d1 = create_driver(CONFIG + %[comment foobar])
d1.run(default_tag: 'test.tag1') do
1.times do
d1.emit({'message'=> 'a' * 100})
d1.emit({'message'=> 'b' * 100})
d1.emit({'message'=> 'c' * 100})
d1.feed({'message'=> 'a' * 100})
d1.feed({'message'=> 'b' * 100})
d1.feed({'message'=> 'c' * 100})
end
end
out = capture_log(d1.instance.log) { d1.instance.flush_emit(60) }
Expand Down

0 comments on commit 2d6ad44

Please sign in to comment.