Skip to content

Commit

Permalink
Add the TCP and Async sinks. Add support for log group name and log s…
Browse files Browse the repository at this point in the history
…tream name. Add the Units reference.
  • Loading branch information
nscott committed Sep 4, 2022
1 parent 67096d6 commit ca4d8c9
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 21 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

See this http:https://keepachangelog.com link for information on how we want this documented formatted.

## v0.7.0

### Added

- A new sink now exists, `EcsFargate`! Today the sink assumes you're using the `awsvpc` network configuration for your containers.
- Included a new dependency, `tcp-client`. Originally a small hand-written TCP client was used but it proved unreliable and well outside the scope of this library.
- Added the new `Units` class for easy reference to the accepted metric units.

## v0.6.0

### Fixed
Expand Down
8 changes: 5 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
PATH
remote: .
specs:
aws-embedded-metrics-customink (0.6.0)
aws-embedded-metrics-customink (0.7.0)
concurrent-ruby
tcp-client

GEM
remote: https://rubygems.org/
specs:
ast (2.4.1)
coderay (1.1.3)
concurrent-ruby (1.1.7)
concurrent-ruby (1.1.10)
method_source (1.0.0)
minitest (5.14.1)
mocha (1.11.2)
Expand All @@ -35,6 +36,7 @@ GEM
rubocop-ast (0.2.0)
parser (>= 2.7.0.1)
ruby-progressbar (1.10.1)
tcp-client (0.11.2)
unicode-display_width (1.7.0)

PLATFORMS
Expand All @@ -49,4 +51,4 @@ DEPENDENCIES
rubocop

BUNDLED WITH
2.1.2
2.3.5
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ Pulled from these two projects using the [Embedded Metric Format Specification](

However, unlike these projects, we differ in the following ways. Again, contributions are very much welcome if you want to see more or change this.

* Initial focus on Lambda. No other sinks.
* As such, no default Dimensions or Configuraiton for:
- `LogGroupName`
- `LogStreamName`
* Initial focus on Lambda. A TCP sink has been added, but no UDP sink exists.
* An async sink wrapper exists that allows messages to be queued and sent on a separate thread.
* No default Dimensions or Configuration for:
- `ServiceName`
- `ServiceType`

Expand All @@ -39,6 +38,9 @@ Simple configuration:
```ruby
Aws::Embedded::Metrics.configure do |c|
c.namespace = 'MyApplication'
# Optional
c.log_group_name = 'MyLogGroup'
c.log_stream_name = 'MyLogStream-UniqueID'
end
```

Expand All @@ -50,6 +52,24 @@ Aws::Embedded::Metrics.configure do |c|
end
```

Using the `Tcp` sink to write over a network:

```ruby
Aws::Embedded::Metrics.configure do |c|
c.sink = Aws::Embedded::Metrics::Sinks::Tcp.new(conn_str: "tcp:https://localhost:25888",
logger: Rails.logger)
end
```

Using the `Async` sink wrapper to incur no latency or errors on writes:
```ruby
Aws::Embedded::Metrics.configure do |c|
tcp_sink = Aws::Embedded::Metrics::Sinks::Tcp.new(conn_str: "tcp:https://localhost:25888",
logger: Rails.logger)
c.sink = Aws::Embedded::Metrics::Sinks::Async.new(tcp_sink, logger: Rails.logger, max_queue_size: 1_000)
end
```

Usage is in a scope block. All metrics are flushed afterward

```ruby
Expand Down
1 change: 1 addition & 0 deletions aws-embedded-metrics-customink.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]
spec.add_dependency 'concurrent-ruby'
spec.add_dependency 'tcp-client'
end
1 change: 1 addition & 0 deletions lib/aws-embedded-metrics-customink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'aws-embedded-metrics-customink/sinks'
require 'aws-embedded-metrics-customink/config'
require 'aws-embedded-metrics-customink/logger'
require 'aws-embedded-metrics-customink/units'
require 'aws-embedded-metrics-customink/instance' if defined?(Rails)

module Aws
Expand Down
20 changes: 17 additions & 3 deletions lib/aws-embedded-metrics-customink/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,38 @@ def config

class Configuration

attr_writer :namespace, :sink
attr_writer :log_group_name,
:log_stream_name,
:namespace,
:sink

def reconfigure
instance_variables.each { |var| instance_variable_set var, nil }
yield(self) if block_given?
self
end

def log_group_name
return @log_group_name if defined?(@log_group_name)

ENV.fetch('AWS_EMF_LOG_GROUP_NAME', nil)
end

def log_stream_name
return @log_stream_name if defined?(@log_stream_name)

ENV.fetch('AWS_EMF_LOG_STREAM_NAME', nil)
end

def namespace
return @namespace if defined?(@namespace)

ENV['AWS_EMF_NAMESPACE'] || 'aws-embedded-metrics'
ENV.fetch('AWS_EMF_NAMESPACE', 'aws-embedded-metrics')
end

def sink
@sink ||= DEFAULT_SINK.new
end

end
end
end
Expand Down
25 changes: 15 additions & 10 deletions lib/aws-embedded-metrics-customink/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Logger

def initialize(sink = Config.config.sink)
@sink = sink
@log_group_name = Config.config.log_group_name
@log_stream_name = Config.config.log_stream_name
@namespace = Config.config.namespace
@dimensions = Concurrent::Array.new
@metrics = Concurrent::Array.new
Expand Down Expand Up @@ -50,17 +52,20 @@ def empty?
end

def message
aws = {
'Timestamp' => timestamp,
'CloudWatchMetrics' => [{
'Namespace' => @namespace,
'Dimensions' => [@dimensions.map(&:keys).flatten],
'Metrics' => @metrics
}]
}

aws['LogGroupName'] = @log_group_name if @log_group_name
aws['LogStreamName'] = @log_stream_name if @log_stream_name

{
'_aws' => {
'Timestamp' => timestamp,
'CloudWatchMetrics' => [
{
'Namespace' => @namespace,
'Dimensions' => [@dimensions.map(&:keys).flatten],
'Metrics' => @metrics
}
]
}
'_aws' => aws
}.tap do |m|
@dimensions.each { |dim| m.merge!(dim) }
m.merge!(@properties)
Expand Down
4 changes: 4 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
require 'aws-embedded-metrics-customink/sinks/async'
require 'aws-embedded-metrics-customink/sinks/logger'
require 'aws-embedded-metrics-customink/sinks/stdout'
require 'aws-embedded-metrics-customink/sinks/tcp'

require 'aws-embedded-metrics-customink/sinks/sink_error'
110 changes: 110 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

require 'tcp-client'

module Aws
module Embedded
module Metrics
module Sinks
#
# Create a sink that will immediately take in messages, enqueue them, and forward them on to a sink.
class Async
attr_reader :queue, :sender

#
# Create a new Async sink which wraps an existing sink. This is most beneficial with the tcp sink.
#
# This was built as a performance-critical piece of code since metrics are often sent in high volumes.
# +#accept+, which is what takes in messages to send to the CW Metric Agent, puts messages into a thread-safe
# queue. A separate thread is then picking up from that queue and sending the messages to the chosen sink.
#
# <b>Creating a new Async sink will create a new thread.</b> This sink is intended to be used sparingly.
#
# Messages that sent to the sink; no more information is known about the message after that.
# If the sink cannot process the message, it is lost.
#
# If a message is enqueued and the queue is full, the message is dropped and a warning is logged.
# @param sink [Sink] A sink to wrap. +#accept+ will be the only method called on the sink.
# @param logger [Logger] A standard Ruby logger to propagate warnings and errors.
# Suggested to use Rails.logger.
# @param max_queue_size [Numeric] The number of messages to buffer in-memory.
# A negative value will buffer everything.
def initialize(sink,
logger: nil,
max_queue_size: 1_000)
raise Sinks::Error, 'Must specify a sink to wrap' if sink.nil?

@sink = sink

@max_queue_size = max_queue_size
@queue = Queue.new
@lock = Mutex.new
@stop = false
@logger = logger
start_sender(@queue)
end

def accept(message)
if @max_queue_size > -1 && @queue.length > @max_queue_size
@logger&.warn("Async metrics queue is full (#{@max_queue_size} items)! Dropping metric message.")
return
end

@queue.push(message)
end

#
# Shut down the sink. By default this blocks until all messages are sent to the agent, or
# the wait time elapses. No more messages will be accepted as soon as this is called.
#
# @param wait_time_seconds [Numeric] The seconds to wait for messages to be sent to the agent.
def shutdown(wait_time_seconds = 30)
# We push a "stop" message to ensure there's something in the queue,
# otherwise it will indefinitely block.
# When a "stop message" comes through it will break the loop.
@queue.push(StopMessage.new)
@queue.close

start = Time.now.utc
until @queue.empty? || Time.now.utc > (start + wait_time_seconds)
# Yield this thread until the queue has processed
sleep(0)
end

# If we haven't been able to eat through the queue, this should terminate the loop
# and allow the thread to rejoin.
@lock.synchronize do
@stop = true
end

@sender_thread&.join
end

def should_stop
@lock.synchronize do
@stop
end
end

def start_sender(queue)
@sender_thread = Thread.new do
stop_message_class = StopMessage.new.class
# Infinitely read from the queue and send messages to the agent
until should_stop
# We use a special message class to ensure
message = queue.pop
break if stop_message_class == message.class

@sink.accept(message)
end
end
end

# Special class to signal that the thread should exit and finish.
class StopMessage; end
private_constant :StopMessage
end
end
end
end
end
9 changes: 9 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks/sink_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Aws
module Embedded
module Metrics
module Sinks
class Error < StandardError; end
end
end
end
end
Loading

0 comments on commit ca4d8c9

Please sign in to comment.