-
-
Notifications
You must be signed in to change notification settings - Fork 484
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Metrics API and aggregator (#2247)
* new `Sentry::Metrics` module with 4 apis that map to the new 4 `Sentry::Metrics::Metric` classes * `increment` - simple counter * `distribution` - array of observations * `gauge` - statistics (last/min/max/sum/count) * `set` - unique values * new `Sentry::Metrics::Aggregator` that starts a thread that flushes pending metric buckets in 5 second intervals * buckets are a nested hash of timestamp (rolled to 10 second intervals) -> metric keys -> actual metric instance * there is a random `flush_shift` once per startup to create jittering * flushable buckets are sent in a new `statsd` type envelope that is not json so made a small change to the `Envelope::Item` * tag key/values are sanitized for unicode/special characters according to the two regexes Reference spec - https://develop.sentry.dev/sdk/metrics/ part of #2246
- Loading branch information
1 parent
478c4cf
commit d13923b
Showing
25 changed files
with
979 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'sentry/metrics/metric' | ||
require 'sentry/metrics/counter_metric' | ||
require 'sentry/metrics/distribution_metric' | ||
require 'sentry/metrics/gauge_metric' | ||
require 'sentry/metrics/set_metric' | ||
require 'sentry/metrics/aggregator' | ||
|
||
module Sentry | ||
module Metrics | ||
class << self | ||
def increment(key, value = 1.0, unit: 'none', tags: {}, timestamp: nil) | ||
Sentry.metrics_aggregator&.add(:c, key, value, unit: unit, tags: tags, timestamp: timestamp) | ||
end | ||
|
||
def distribution(key, value, unit: 'none', tags: {}, timestamp: nil) | ||
Sentry.metrics_aggregator&.add(:d, key, value, unit: unit, tags: tags, timestamp: timestamp) | ||
end | ||
|
||
def set(key, value, unit: 'none', tags: {}, timestamp: nil) | ||
Sentry.metrics_aggregator&.add(:s, key, value, unit: unit, tags: tags, timestamp: timestamp) | ||
end | ||
|
||
def gauge(key, value, unit: 'none', tags: {}, timestamp: nil) | ||
Sentry.metrics_aggregator&.add(:g, key, value, unit: unit, tags: tags, timestamp: timestamp) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
# frozen_string_literal: true | ||
|
||
module Sentry | ||
module Metrics | ||
class Aggregator | ||
include LoggingHelper | ||
|
||
FLUSH_INTERVAL = 5 | ||
ROLLUP_IN_SECONDS = 10 | ||
|
||
KEY_SANITIZATION_REGEX = /[^a-zA-Z0-9_\/.-]+/ | ||
VALUE_SANITIZATION_REGEX = /[^[[:word:]][[:digit:]][[:space:]]_:\/@\.{}\[\]$-]+/ | ||
|
||
METRIC_TYPES = { | ||
c: CounterMetric, | ||
d: DistributionMetric, | ||
g: GaugeMetric, | ||
s: SetMetric | ||
} | ||
|
||
# exposed only for testing | ||
attr_reader :thread, :buckets, :flush_shift | ||
|
||
def initialize(configuration, client) | ||
@client = client | ||
@logger = configuration.logger | ||
|
||
@default_tags = {} | ||
@default_tags['release'] = configuration.release if configuration.release | ||
@default_tags['environment'] = configuration.environment if configuration.environment | ||
|
||
@thread = nil | ||
@exited = false | ||
@mutex = Mutex.new | ||
|
||
# buckets are a nested hash of timestamp -> bucket keys -> Metric instance | ||
@buckets = {} | ||
|
||
# the flush interval needs to be shifted once per startup to create jittering | ||
@flush_shift = Random.rand * ROLLUP_IN_SECONDS | ||
end | ||
|
||
def add(type, | ||
key, | ||
value, | ||
unit: 'none', | ||
tags: {}, | ||
timestamp: nil) | ||
return unless ensure_thread | ||
return unless METRIC_TYPES.keys.include?(type) | ||
|
||
timestamp = timestamp.to_i if timestamp.is_a?(Time) | ||
timestamp ||= Sentry.utc_now.to_i | ||
|
||
# this is integer division and thus takes the floor of the division | ||
# and buckets into 10 second intervals | ||
bucket_timestamp = (timestamp / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS | ||
|
||
serialized_tags = serialize_tags(get_updated_tags(tags)) | ||
bucket_key = [type, key, unit, serialized_tags] | ||
|
||
@mutex.synchronize do | ||
@buckets[bucket_timestamp] ||= {} | ||
|
||
if @buckets[bucket_timestamp][bucket_key] | ||
@buckets[bucket_timestamp][bucket_key].add(value) | ||
else | ||
@buckets[bucket_timestamp][bucket_key] = METRIC_TYPES[type].new(value) | ||
end | ||
end | ||
end | ||
|
||
def flush(force: false) | ||
flushable_buckets = get_flushable_buckets!(force) | ||
return if flushable_buckets.empty? | ||
|
||
payload = serialize_buckets(flushable_buckets) | ||
envelope = Envelope.new | ||
envelope.add_item( | ||
{ type: 'statsd', length: payload.bytesize }, | ||
payload | ||
) | ||
|
||
Sentry.background_worker.perform do | ||
@client.transport.send_envelope(envelope) | ||
end | ||
end | ||
|
||
def kill | ||
log_debug('[Metrics::Aggregator] killing thread') | ||
|
||
@exited = true | ||
@thread&.kill | ||
end | ||
|
||
private | ||
|
||
def ensure_thread | ||
return false if @exited | ||
return true if @thread&.alive? | ||
|
||
@thread = Thread.new do | ||
loop do | ||
# TODO-neel-metrics use event for force flush later | ||
sleep(FLUSH_INTERVAL) | ||
flush | ||
end | ||
end | ||
|
||
true | ||
rescue ThreadError | ||
log_debug('[Metrics::Aggregator] thread creation failed') | ||
@exited = true | ||
false | ||
end | ||
|
||
# important to sort for key consistency | ||
def serialize_tags(tags) | ||
tags.flat_map do |k, v| | ||
if v.is_a?(Array) | ||
v.map { |x| [k.to_s, x.to_s] } | ||
else | ||
[[k.to_s, v.to_s]] | ||
end | ||
end.sort | ||
end | ||
|
||
def get_flushable_buckets!(force) | ||
@mutex.synchronize do | ||
flushable_buckets = {} | ||
|
||
if force | ||
flushable_buckets = @buckets | ||
@buckets = {} | ||
else | ||
cutoff = Sentry.utc_now.to_i - ROLLUP_IN_SECONDS - @flush_shift | ||
flushable_buckets = @buckets.select { |k, _| k <= cutoff } | ||
@buckets.reject! { |k, _| k <= cutoff } | ||
end | ||
|
||
flushable_buckets | ||
end | ||
end | ||
|
||
# serialize buckets to statsd format | ||
def serialize_buckets(buckets) | ||
buckets.map do |timestamp, timestamp_buckets| | ||
timestamp_buckets.map do |metric_key, metric| | ||
type, key, unit, tags = metric_key | ||
values = metric.serialize.join(':') | ||
sanitized_tags = tags.map { |k, v| "#{sanitize_key(k)}:#{sanitize_value(v)}" }.join(',') | ||
|
||
"#{sanitize_key(key)}@#{unit}:#{values}|#{type}|\##{sanitized_tags}|T#{timestamp}" | ||
end | ||
end.flatten.join("\n") | ||
end | ||
|
||
def sanitize_key(key) | ||
key.gsub(KEY_SANITIZATION_REGEX, '_') | ||
end | ||
|
||
def sanitize_value(value) | ||
value.gsub(VALUE_SANITIZATION_REGEX, '') | ||
end | ||
|
||
def get_transaction_name | ||
scope = Sentry.get_current_scope | ||
return nil unless scope && scope.transaction_name | ||
return nil if scope.transaction_source_low_quality? | ||
|
||
scope.transaction_name | ||
end | ||
|
||
def get_updated_tags(tags) | ||
updated_tags = @default_tags.merge(tags) | ||
|
||
transaction_name = get_transaction_name | ||
updated_tags['transaction'] = transaction_name if transaction_name | ||
|
||
updated_tags | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# frozen_string_literal: true | ||
|
||
module Sentry | ||
module Metrics | ||
class Configuration | ||
# Enable metrics usage | ||
# Starts a new {Sentry::Metrics::Aggregator} instance to aggregate metrics | ||
# and a thread to aggregate flush every 5 seconds. | ||
# @return [Boolean] | ||
attr_accessor :enabled | ||
|
||
def initialize | ||
@enabled = false | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# frozen_string_literal: true | ||
|
||
module Sentry | ||
module Metrics | ||
class CounterMetric < Metric | ||
attr_reader :value | ||
|
||
def initialize(value) | ||
@value = value.to_f | ||
end | ||
|
||
def add(value) | ||
@value += value.to_f | ||
end | ||
|
||
def serialize | ||
[value] | ||
end | ||
|
||
def weight | ||
1 | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.