Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.

Commit

Permalink
Merge pull request #9 from nscott/feature/tcp-sink
Browse files Browse the repository at this point in the history
Add the TCP sink. Add support for log group name and log stream name. Add the Units reference.
  • Loading branch information
metaskills committed Apr 17, 2023
2 parents 67096d6 + 5a9ff2c commit f9973f7
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 41 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

See this http:https://keepachangelog.com link for information on how we want this documented formatted.

## v0.7.0

### Added

- A new `Tcp` sink is added. The `Tcp` sink can send EMF messages to any valid TCP endpoint.
- `tcp-client` will be required at runtime if you use the `Tcp` sink. Originally a small hand-written TCP client was used but it proved unreliable and well outside the scope of this library.
- Added the new `Units` class for easy reference to the accepted metric units.

## v0.6.0

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ gem 'rubocop'
group :test do
gem 'mocha'
gem 'pry'
gem 'tcp-client'
end
52 changes: 29 additions & 23 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,41 +1,46 @@
PATH
remote: .
specs:
aws-embedded-metrics-customink (0.6.0)
aws-embedded-metrics-customink (0.7.0)
concurrent-ruby

GEM
remote: https://rubygems.org/
specs:
ast (2.4.1)
ast (2.4.2)
coderay (1.1.3)
concurrent-ruby (1.1.7)
concurrent-ruby (1.1.10)
json (2.6.3)
method_source (1.0.0)
minitest (5.14.1)
mocha (1.11.2)
parallel (1.19.2)
parser (2.7.1.4)
minitest (5.17.0)
mocha (2.0.2)
ruby2_keywords (>= 0.0.5)
parallel (1.22.1)
parser (3.2.0.0)
ast (~> 2.4.1)
pry (0.13.1)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
rainbow (3.0.0)
rake (13.0.1)
regexp_parser (1.7.1)
rexml (3.2.3)
rubocop (0.88.0)
rainbow (3.1.1)
rake (13.0.6)
regexp_parser (2.6.1)
rexml (3.2.5)
rubocop (1.43.0)
json (~> 2.3)
parallel (~> 1.10)
parser (>= 2.7.1.1)
parser (>= 3.2.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.7)
rexml
rubocop-ast (>= 0.1.0, < 1.0)
regexp_parser (>= 1.8, < 3.0)
rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.24.1, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 2.0)
rubocop-ast (0.2.0)
parser (>= 2.7.0.1)
ruby-progressbar (1.10.1)
unicode-display_width (1.7.0)
unicode-display_width (>= 2.4.0, < 3.0)
rubocop-ast (1.24.1)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
ruby2_keywords (0.0.5)
tcp-client (0.11.4)
unicode-display_width (2.4.2)

PLATFORMS
ruby
Expand All @@ -47,6 +52,7 @@ DEPENDENCIES
pry
rake
rubocop
tcp-client

BUNDLED WITH
2.1.2
2.3.5
18 changes: 14 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ Pulled from these two projects using the [Embedded Metric Format Specification](

However, unlike these projects, we differ in the following ways. Again, contributions are very much welcome if you want to see more or change this.

* Initial focus on Lambda. No other sinks.
* As such, no default Dimensions or Configuraiton for:
- `LogGroupName`
- `LogStreamName`
* Initial focus on Lambda. A TCP sink has been added, but no UDP sink exists.
* No default Dimensions or Configuration for:
- `ServiceName`
- `ServiceType`

Expand All @@ -39,6 +37,9 @@ Simple configuration:
```ruby
Aws::Embedded::Metrics.configure do |c|
c.namespace = 'MyApplication'
# Optional
c.log_group_name = 'MyLogGroup'
c.log_stream_name = 'MyLogStream-UniqueID'
end
```

Expand All @@ -50,6 +51,15 @@ Aws::Embedded::Metrics.configure do |c|
end
```

Using the `Tcp` sink to write over a network:

```ruby
Aws::Embedded::Metrics.configure do |c|
c.sink = Aws::Embedded::Metrics::Sinks::Tcp.new(conn_str: "tcp:https://localhost:25888",
logger: Rails.logger)
end
```

Usage is in a scope block. All metrics are flushed afterward

```ruby
Expand Down
1 change: 1 addition & 0 deletions lib/aws-embedded-metrics-customink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'aws-embedded-metrics-customink/sinks'
require 'aws-embedded-metrics-customink/config'
require 'aws-embedded-metrics-customink/logger'
require 'aws-embedded-metrics-customink/units'
require 'aws-embedded-metrics-customink/instance' if defined?(Rails)

module Aws
Expand Down
20 changes: 17 additions & 3 deletions lib/aws-embedded-metrics-customink/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,38 @@ def config

class Configuration

attr_writer :namespace, :sink
attr_writer :log_group_name,
:log_stream_name,
:namespace,
:sink

def reconfigure
instance_variables.each { |var| instance_variable_set var, nil }
yield(self) if block_given?
self
end

def log_group_name
return @log_group_name if defined?(@log_group_name)

ENV.fetch('AWS_EMF_LOG_GROUP_NAME', nil)
end

def log_stream_name
return @log_stream_name if defined?(@log_stream_name)

ENV.fetch('AWS_EMF_LOG_STREAM_NAME', nil)
end

def namespace
return @namespace if defined?(@namespace)

ENV['AWS_EMF_NAMESPACE'] || 'aws-embedded-metrics'
ENV.fetch('AWS_EMF_NAMESPACE', 'aws-embedded-metrics')
end

def sink
@sink ||= DEFAULT_SINK.new
end

end
end
end
Expand Down
25 changes: 15 additions & 10 deletions lib/aws-embedded-metrics-customink/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Logger

def initialize(sink = Config.config.sink)
@sink = sink
@log_group_name = Config.config.log_group_name
@log_stream_name = Config.config.log_stream_name
@namespace = Config.config.namespace
@dimensions = Concurrent::Array.new
@metrics = Concurrent::Array.new
Expand Down Expand Up @@ -50,17 +52,20 @@ def empty?
end

def message
aws = {
'Timestamp' => timestamp,
'CloudWatchMetrics' => [{
'Namespace' => @namespace,
'Dimensions' => [@dimensions.map(&:keys).flatten],
'Metrics' => @metrics
}]
}

aws['LogGroupName'] = @log_group_name if @log_group_name
aws['LogStreamName'] = @log_stream_name if @log_stream_name

{
'_aws' => {
'Timestamp' => timestamp,
'CloudWatchMetrics' => [
{
'Namespace' => @namespace,
'Dimensions' => [@dimensions.map(&:keys).flatten],
'Metrics' => @metrics
}
]
}
'_aws' => aws
}.tap do |m|
@dimensions.each { |dim| m.merge!(dim) }
m.merge!(@properties)
Expand Down
3 changes: 3 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
require 'aws-embedded-metrics-customink/sinks/logger'
require 'aws-embedded-metrics-customink/sinks/stdout'
require 'aws-embedded-metrics-customink/sinks/tcp'

require 'aws-embedded-metrics-customink/sinks/sink_error'
9 changes: 9 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks/sink_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Aws
module Embedded
module Metrics
module Sinks
class Error < StandardError; end
end
end
end
end
93 changes: 93 additions & 0 deletions lib/aws-embedded-metrics-customink/sinks/tcp.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

gem 'tcp-client'
require 'tcp-client'

module Aws
module Embedded
module Metrics
module Sinks
#
# Create a sink that will communicate to a CloudWatch Log Agent over a TCP connection.
#
# See https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html
# for configuration information
class Tcp
AWS_EMF_AGENT_ENDPOINT_ENV_VAR = 'AWS_EMF_AGENT_ENDPOINT'
attr_reader :queue
attr :client_opts

#
# Create a new TCP sink. It will use the +AWS_EMF_AGENT_ENDPOINT+ environment variable by default to
# connect to a CloudWatch Metric Agent.
#
# @param conn_str [String] A connection string, formatted like 'tcp:https://127.0.0.1:25888'.
# @param conn_timeout_secs [Numeric] The number of seconds before timing out the connection to the agent.
# @param write_timeout_secs [Numeric] The number of seconds to wait before timing out a write.
# @param logger [Logger] A standard Ruby logger to propagate warnings and errors.
# Suggested to use Rails.logger.
def initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil),
conn_timeout_secs: 10,
write_timeout_secs: 10,
logger: nil)
if conn_str.nil?
raise Sinks::Error, "Must specify a connection string or set environment variable #{AWS_EMF_AGENT_ENDPOINT_ENV_VAR}"
end

@logger = logger
@cw_agent_uri = URI.parse(conn_str)
if @cw_agent_uri.scheme != 'tcp' || !@cw_agent_uri.host || !@cw_agent_uri.port
raise Sinks::Error, "Expected connection string to be in format tcp:https://<host>:<port>, got '#{conn_str}'"
end

@client_opts = TCPClient::Configuration.create(
buffered: true,
keep_alive: true,
reverse_lookup: true,
connect_timeout: conn_timeout_secs,
write_timeout: write_timeout_secs
)
@conn = nil
end

def log_warn(msg)
@logger&.warn(msg)
end

def log_err(msg)
@logger&.error(msg)
end

def create_conn(host, port, opts)
TCPClient.open("#{host}:#{port}", opts)
end

def connection
@conn = create_conn(@cw_agent_uri.host, @cw_agent_uri.port, @client_opts) if @conn.nil? || @conn.closed?
@conn
end

def send_message(message)
retries = 2
conn = nil
begin
conn = connection
conn.write(message)
rescue Errno::ECONNREFUSED
conn.close unless conn.nil? || conn.closed?
log_warn("Could not connect to CloudWatch Agent at #{@cw_agent_uri.scheme}:https://#{@cw_agent_uri.host}:#{@cw_agent_uri.port}")
retries -= 1
retry if retries >= 0
rescue StandardError => e
log_err("#{e.class}: #{e.message}: #{e.backtrace.join("\n")}")
end
end

def accept(message)
send_message("#{JSON.dump(message)}\n")
end
end
end
end
end
end
49 changes: 49 additions & 0 deletions lib/aws-embedded-metrics-customink/units.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

module Aws
module Embedded
module Metrics
class Units
# Time
SECONDS = 'Seconds'
MICROSECONDS = 'Microseconds'
MILLISECONDS = 'Milliseconds'

# Size
BYTES = 'Bytes'
KILOBYTES = 'Kilobytes'
MEGABYTES = 'Megabytes'
GIGABYTES = 'Gigabytes'
TERABYTES = 'Terabytes'

BITS = 'Bits'
KILOBITS = 'Kilobits'
MEGABITS = 'Megabits'
GIGABITS = 'Gigabits'
TERABITS = 'Terabits'

# Simple units
PERCENT = 'Percent'
COUNT = 'Count'

# Size over time
BYTES_SECOND = "#{BYTES}/Second"
KILOBYTES_SECOND = "#{KILOBYTES}/Second"
MEGABYTES_SECOND = "#{MEGABYTES}/Second"
GIGABYTES_SECOND = "#{GIGABYTES}/Second"
TERABYTES_SECOND = "#{TERABYTES}/Second"

BITS_SECOND = "#{BITS}/Second"
KILOBITS_SECOND = "#{KILOBITS}/Second"
MEGABITS_SECOND = "#{MEGABITS}/Second"
GIGABITS_SECOND = "#{GIGABITS}/Second"
TERABITS_SECOND = "#{TERABITS}/Second"

COUNT_SECOND = "#{COUNT}/Second"

# Unit-less
NONE = 'None'
end
end
end
end
2 changes: 1 addition & 1 deletion lib/aws-embedded-metrics-customink/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Aws
module Embedded
module Metrics
VERSION = '0.6.0'.freeze
VERSION = '0.7.0'.freeze
end
end
end
Loading

0 comments on commit f9973f7

Please sign in to comment.