Skip to content

Commit

Permalink
stabilize and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
codekitchen committed Mar 3, 2015
1 parent 59e2600 commit b16acc9
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 53 deletions.
2 changes: 2 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--color
--require spec_helper
7 changes: 7 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
require "bundler/gem_tasks"

begin
require 'rspec/core/rake_task'
RSpec::Core::RakeTask.new(:spec)
rescue LoadError
end

task default: "spec"
103 changes: 51 additions & 52 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,34 @@
require "logstash/environment"
require "logstash/namespace"

require "logstash/inputs/kinesis/version"
require 'logstash-input-kinesis_jars'
require "logstash/inputs/kinesis/version"

# Receive events through an AWS Kinesis stream.
#
# This input plugin uses the Java Kinesis Client Library underneath, so the
# documentation at https://github.com/awslabs/amazon-kinesis-client will be
# useful.
#
# AWS credentials can be specified either through environment variables, or an
# IAM instance role. The library uses a DynamoDB table for worker coordination,
# so you'll need to grant access to that as well as to the Kinesis stream. The
# DynamoDB table has the same name as the `application_name` configuration
# option, which defaults to "logstash".
#
# The library can optionally also send worker statistics to CloudWatch.
class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker
require "logstash/inputs/kinesis/worker"

config_name 'kinesis'
milestone 1

attr_reader(
:kcl_config,
:kcl_worker,
)

# The application name used for the dynamodb coordination table. Must be
# unique for this kinesis stream.
config :application_name, :validate => :string, :default => "logstash"
Expand All @@ -23,77 +42,57 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# How many seconds between worker checkpoints to dynamodb.
config :checkpoint_interval_seconds, :validate => :number, :default => 60

# Worker metric tracking. By default this is disabled, set it to "cloudwatch"
# to enable the cloudwatch integration in the Kinesis Client Library.
config :metrics, :validate => [nil, "cloudwatch"], :default => nil

def initialize(params = {}, kcl_class = KCL::Worker)
@kcl_class = kcl_class
super(params)
end

def register
# the INFO log level is extremely noisy in KCL
org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis").
logger.setLevel(java.util.logging::Level::WARNING)

worker_id = java.util::UUID.randomUUID.to_s
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new()
@config = KCL::KinesisClientLibConfiguration.new(
@kcl_config = KCL::KinesisClientLibConfiguration.new(
@application_name,
@kinesis_stream_name,
creds,
worker_id).withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON)
end

def run(output_queue)
@worker = KCL::Worker.new(
proc { Worker.new(@codec, output_queue, method(:decorate), @checkpoint_interval_seconds) },
@config,
com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory.new)
@worker.run()
worker_factory = proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
if metrics_factory
@kcl_worker = @kcl_class.new(
worker_factory,
@kcl_config,
metrics_factory)
else
@kcl_worker = @kcl_class.new(
worker_factory,
@kcl_config)
end

@kcl_worker.run()
end

def teardown
@worker.shutdown if @worker
@kcl_worker.shutdown if @kcl_worker
end

class Worker
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor

def initialize(*args)
# nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
if !@constructed
@codec, @output_queue, @decorator, @checkpoint_interval = args
@next_checkpoint = Time.now - 600
@constructed = true
else
_shard_id, _ = args
@decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
end
end

def processRecords(records, checkpointer)
records.each { |record| process_record(record) }
if Time.now >= @next_checkpoint
checkpoint(checkpointer)
@next_checkpoint = Time.now + @checkpoint_interval
end
end

def shutdown(checkpointer, reason)
if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
checkpoint(checkpointer)
end
end

protected

def checkpoint(checkpointer)
checkpointer.checkpoint()
rescue => error
@logger.error("Kinesis worker failed checkpointing: #{error}")
end
protected

def process_record(record)
raw = @decoder.decode(record.getData).to_s
@codec.decode(raw) do |event|
@decorator.call(event)
@output_queue << event
end
rescue => error
@logger.error("Error processing record: #{error}")
def metrics_factory
case @metrics
when nil
com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory.new
when 'cloudwatch'
nil # default in the underlying library
end
end
end
2 changes: 1 addition & 1 deletion lib/logstash/inputs/kinesis/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Logstash
module Input
module Kinesis
VERSION = "1.1.0"
VERSION = "1.2.1"
end
end
end
56 changes: 56 additions & 0 deletions lib/logstash/inputs/kinesis/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
class LogStash::Inputs::Kinesis::Worker
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor

attr_reader(
:checkpoint_interval,
:codec,
:decorator,
:logger,
:output_queue,
)

def initialize(*args)
# nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
if !@constructed
@codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
@next_checkpoint = Time.now - 600
@constructed = true
else
_shard_id, _ = args
@decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
end
end
public :initialize

def processRecords(records, checkpointer)
records.each { |record| process_record(record) }
if Time.now >= @next_checkpoint
checkpoint(checkpointer)
@next_checkpoint = Time.now + @checkpoint_interval
end
end

def shutdown(checkpointer, reason)
if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
checkpoint(checkpointer)
end
end

protected

def checkpoint(checkpointer)
checkpointer.checkpoint()
rescue => error
@logger.error("Kinesis worker failed checkpointing: #{error}")
end

def process_record(record)
raw = @decoder.decode(record.getData).to_s
@codec.decode(raw) do |event|
@decorator.call(event)
@output_queue << event
end
rescue => error
@logger.error("Error processing record: #{error}")
end
end
3 changes: 3 additions & 0 deletions logstash-input-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ Gem::Specification.new do |spec|

spec.add_development_dependency "bundler", "~> 1.7"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rspec", "~> 3.2.0"
spec.add_development_dependency "logstash-core"
spec.add_development_dependency "logstash-codec-json"
end
58 changes: 58 additions & 0 deletions spec/inputs/kinesis/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
require "logstash/plugin"
require "logstash/inputs/kinesis"
require "logstash/codecs/json"
require "json"

RSpec.describe "LogStash::Inputs::Kinesis::Worker" do
subject!(:worker) { LogStash::Inputs::Kinesis::Worker.new(codec, queue, decorator, checkpoint_interval) }
let(:codec) { LogStash::Codecs::JSON.new() }
let(:queue) { Queue.new }
let(:decorator) { proc { |x| x["decorated"] = true; x } }
let(:checkpoint_interval) { 120 }
let(:checkpointer) { double('checkpointer', checkpoint: nil) }
let(:shard_id) { "xyz" }

it "honors the initialize java interface method contract" do
expect { worker.initialize(shard_id) }.to_not raise_error
end

def record(hash = { "message" => "test" })
encoder = java.nio.charset::Charset.forName("UTF-8").newEncoder()
data = encoder.encode(java.nio.CharBuffer.wrap(JSON.generate(hash)))
double(getData: data)
end

let(:record1) { record(id: "record1", message: "test1") }
let(:record2) { record(id: "record2", message: "test2") }

context "initialized" do
before do
worker.initialize(shard_id)
end

describe "#processRecords" do
it "decodes and queues each record with decoration" do
worker.processRecords([record1, record2], checkpointer)
m1 = queue.pop
m2 = queue.pop
expect(m1).to be_kind_of(LogStash::Event)
expect(m2).to be_kind_of(LogStash::Event)
expect(m1['id']).to eq("record1")
expect(m1['message']).to eq("test1")
expect(m1['decorated']).to eq(true)
end

it "checkpoints on interval" do
expect(checkpointer).to receive(:checkpoint).once
worker.processRecords([], checkpointer)

# not this time
worker.processRecords([], checkpointer)

allow(Time).to receive(:now).and_return(Time.now + 125)
expect(checkpointer).to receive(:checkpoint).once
worker.processRecords([], checkpointer)
end
end
end
end
80 changes: 80 additions & 0 deletions spec/inputs/kinesis_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require "logstash/plugin"
require "logstash/inputs/kinesis"
require "logstash/codecs/json"

RSpec.describe "inputs/kinesis" do
KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker

# It's very difficult to directly test the Java class, with all its private members,
# so we subclass it here.
class TestKCLWorker < KCL::Worker
field_reader :metricsFactory, :recordProcessorFactory
def run
end
end

let(:config) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
}}

subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config, kcl_class) }
let(:kcl_class) { TestKCLWorker }
let(:metrics) { nil }
let(:codec) { LogStash::Codecs::JSON.new() }

it "registers without error" do
input = LogStash::Plugin.lookup("input", "kinesis").new("kinesis_stream_name" => "specs")
expect { input.register }.to_not raise_error
end

it "configures the KCL" do
kinesis.register
expect(kinesis.kcl_config.applicationName).to eq("my-processor")
expect(kinesis.kcl_config.streamName).to eq("run-specs")
expect(kinesis.kcl_config.initialPositionInStream).to eq(KCL::InitialPositionInStream::TRIM_HORIZON)
end

context "#run" do
let(:queue) { Queue.new }

before do
kinesis.register
end

it "clones the codec for each worker" do
expect(codec).to receive(:clone).once
kinesis.run(queue)
worker = kinesis.kcl_worker.recordProcessorFactory.call()
expect(worker).to be_kind_of(LogStash::Inputs::Kinesis::Worker)
end

it "generates a valid worker via the factory proc" do
kinesis.run(queue)
worker = kinesis.kcl_worker.recordProcessorFactory.call()
expect(worker.codec).to be_kind_of(codec.class)
expect(worker.checkpoint_interval).to eq(120)
expect(worker.output_queue).to eq(queue)
expect(worker.decorator).to eq(kinesis.method(:decorate))
expect(worker.logger).to eq(kinesis.logger)
end

it "disables metric tracking by default" do
kinesis.run(queue)
expect(kinesis.kcl_worker.metricsFactory).to be_kind_of(com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory)
end

context "cloudwatch" do
let(:metrics) { "cloudwatch" }
it "uses cloudwatch metrics if specified" do
kinesis.run(queue)
expect(kinesis.kcl_worker.metricsFactory).to be_kind_of(com.amazonaws.services.kinesis.metrics.impl::CWMetricsFactory)
# the process hangs otherwise
kinesis.kcl_worker.metricsFactory.shutdown
end
end
end
end
17 changes: 17 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
RSpec.configure do |config|
config.expect_with :rspec do |expectations|
expectations.include_chain_clauses_in_custom_matcher_descriptions = true
end

config.mock_with :rspec do |mocks|
mocks.verify_partial_doubles = true
end

config.disable_monkey_patching!
config.warnings = false
if config.files_to_run.one?
config.default_formatter = 'doc'
end
config.order = :random
Kernel.srand config.seed
end

0 comments on commit b16acc9

Please sign in to comment.