From 92b1a66a63aa10fc58c9f2ac4baca859437db40c Mon Sep 17 00:00:00 2001 From: Richard Deurwaarder Date: Thu, 2 May 2019 12:47:36 +0200 Subject: [PATCH] [FLINK-12325][metrics] StatsDReporter properly handles negative values --- .../flink/metrics/statsd/StatsDReporter.java | 59 ++++++++++++++----- .../metrics/statsd/StatsDReporterTest.java | 59 +++++++++++++++++++ 2 files changed, 102 insertions(+), 16 deletions(-) diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index 527f9c1076152..29fbeb9165953 100644 --- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -127,14 +127,20 @@ public void report() { // ------------------------------------------------------------------------ private void reportCounter(final String name, final Counter counter) { - send(name, String.valueOf(counter.getCount())); + send(name, counter.getCount()); } private void reportGauge(final String name, final Gauge gauge) { Object value = gauge.getValue(); - if (value != null) { - send(name, value.toString()); + if (value == null) { + return; } + + if (value instanceof Number) { + send(numberIsNegative((Number) value), name, value.toString()); + } + + send(name, value.toString()); } private void reportHistogram(final String name, final Histogram histogram) { @@ -143,25 +149,25 @@ private void reportHistogram(final String name, final Histogram histogram) { HistogramStatistics statistics = histogram.getStatistics(); if (statistics != null) { - send(prefix(name, "count"), String.valueOf(histogram.getCount())); - send(prefix(name, "max"), String.valueOf(statistics.getMax())); - send(prefix(name, "min"), String.valueOf(statistics.getMin())); - send(prefix(name, "mean"), String.valueOf(statistics.getMean())); - send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev())); - send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5))); - send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75))); - send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95))); - send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98))); - send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99))); - send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999))); + send(prefix(name, "count"), histogram.getCount()); + send(prefix(name, "max"), statistics.getMax()); + send(prefix(name, "min"), statistics.getMin()); + send(prefix(name, "mean"), statistics.getMean()); + send(prefix(name, "stddev"), statistics.getStdDev()); + send(prefix(name, "p50"), statistics.getQuantile(0.5)); + send(prefix(name, "p75"), statistics.getQuantile(0.75)); + send(prefix(name, "p95"), statistics.getQuantile(0.95)); + send(prefix(name, "p98"), statistics.getQuantile(0.98)); + send(prefix(name, "p99"), statistics.getQuantile(0.99)); + send(prefix(name, "p999"), statistics.getQuantile(0.999)); } } } private void reportMeter(final String name, final Meter meter) { if (meter != null) { - send(prefix(name, "rate"), String.valueOf(meter.getRate())); - send(prefix(name, "count"), String.valueOf(meter.getCount())); + send(prefix(name, "rate"), meter.getRate()); + send(prefix(name, "count"), meter.getCount()); } } @@ -179,6 +185,23 @@ private String prefix(String ... names) { } } + private void send(String name, double value) { + send(numberIsNegative(value), name, String.valueOf(value)); + } + + private void send(String name, long value) { + send(value < 0, name, String.valueOf(value)); + } + + private void send(boolean resetToZero, String name, String value) { + if (resetToZero) { + // negative values are interpreted as reductions instead of absolute values + // reset value to 0 before applying reduction as a workaround + send(name, "0"); + } + send(name, value); + } + private void send(final String name, final String value) { try { String formatted = String.format("%s:%s|g", name, value); @@ -216,4 +239,8 @@ public String filterCharacters(String input) { return chars == null ? input : new String(chars, 0, pos); } + + private boolean numberIsNegative(Number input) { + return Double.compare(input.doubleValue(), 0) < 0; + } } diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 61bf07d947654..101326af8f8c3 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -152,6 +152,36 @@ public void testStatsDHistogramReporting() throws Exception { testMetricAndAssert(new TestHistogram(), "metric", expectedLines); } + @Test + public void testStatsDHistogramReportingOfNegativeValues() throws Exception { + TestHistogram histogram = new TestHistogram(); + histogram.setCount(-101); + histogram.setMean(-104); + histogram.setMin(-107); + histogram.setMax(-106); + histogram.setStdDev(-105); + + Set expectedLines = new HashSet<>(); + expectedLines.add("metric.count:0|g"); + expectedLines.add("metric.count:-101|g"); + expectedLines.add("metric.mean:0|g"); + expectedLines.add("metric.mean:-104.0|g"); + expectedLines.add("metric.min:0|g"); + expectedLines.add("metric.min:-107|g"); + expectedLines.add("metric.max:0|g"); + expectedLines.add("metric.max:-106|g"); + expectedLines.add("metric.stddev:0|g"); + expectedLines.add("metric.stddev:-105.0|g"); + expectedLines.add("metric.p75:0.75|g"); + expectedLines.add("metric.p98:0.98|g"); + expectedLines.add("metric.p99:0.99|g"); + expectedLines.add("metric.p999:0.999|g"); + expectedLines.add("metric.p95:0.95|g"); + expectedLines.add("metric.p50:0.5|g"); + + testMetricAndAssert(histogram, "metric", expectedLines); + } + /** * Tests that meters are properly reported via the StatsD reporter. */ @@ -164,6 +194,17 @@ public void testStatsDMetersReporting() throws Exception { testMetricAndAssert(new TestMeter(), "metric", expectedLines); } + @Test + public void testStatsDMetersReportingOfNegativeValues() throws Exception { + Set expectedLines = new HashSet<>(); + expectedLines.add("metric.rate:0|g"); + expectedLines.add("metric.rate:-5.3|g"); + expectedLines.add("metric.count:0|g"); + expectedLines.add("metric.count:-50|g"); + + testMetricAndAssert(new TestMeter(-50, -5.3), "metric", expectedLines); + } + /** * Tests that counter are properly reported via the StatsD reporter. */ @@ -175,6 +216,15 @@ public void testStatsDCountersReporting() throws Exception { testMetricAndAssert(new TestCounter(100), "metric", expectedLines); } + @Test + public void testStatsDCountersReportingOfNegativeValues() throws Exception { + Set expectedLines = new HashSet<>(); + expectedLines.add("metric:0|g"); + expectedLines.add("metric:-51|g"); + + testMetricAndAssert(new TestCounter(-51), "metric", expectedLines); + } + @Test public void testStatsDGaugesReporting() throws Exception { Set expectedLines = new HashSet<>(2); @@ -183,6 +233,15 @@ public void testStatsDGaugesReporting() throws Exception { testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines); } + @Test + public void testStatsDGaugesReportingOfNegativeValues() throws Exception { + Set expectedLines = new HashSet<>(); + expectedLines.add("metric:0|g"); + expectedLines.add("metric:-12345|g"); + + testMetricAndAssert((Gauge) () -> -12345, "metric", expectedLines); + } + private void testMetricAndAssert(Metric metric, String metricName, Set expectation) throws Exception { StatsDReporter reporter = null; DatagramSocketReceiver receiver = null;