Skip to content

Commit

Permalink
[FLINK-12325][metrics] StatsDReporter properly handles negative values
Browse files Browse the repository at this point in the history
  • Loading branch information
Xeli authored and zentol committed May 9, 2019
1 parent c636c53 commit 92b1a66
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,20 @@ public void report() {
// ------------------------------------------------------------------------

private void reportCounter(final String name, final Counter counter) {
send(name, String.valueOf(counter.getCount()));
send(name, counter.getCount());
}

private void reportGauge(final String name, final Gauge<?> gauge) {
Object value = gauge.getValue();
if (value != null) {
send(name, value.toString());
if (value == null) {
return;
}

if (value instanceof Number) {
send(numberIsNegative((Number) value), name, value.toString());
}

send(name, value.toString());
}

private void reportHistogram(final String name, final Histogram histogram) {
Expand All @@ -143,25 +149,25 @@ private void reportHistogram(final String name, final Histogram histogram) {
HistogramStatistics statistics = histogram.getStatistics();

if (statistics != null) {
send(prefix(name, "count"), String.valueOf(histogram.getCount()));
send(prefix(name, "max"), String.valueOf(statistics.getMax()));
send(prefix(name, "min"), String.valueOf(statistics.getMin()));
send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
send(prefix(name, "count"), histogram.getCount());
send(prefix(name, "max"), statistics.getMax());
send(prefix(name, "min"), statistics.getMin());
send(prefix(name, "mean"), statistics.getMean());
send(prefix(name, "stddev"), statistics.getStdDev());
send(prefix(name, "p50"), statistics.getQuantile(0.5));
send(prefix(name, "p75"), statistics.getQuantile(0.75));
send(prefix(name, "p95"), statistics.getQuantile(0.95));
send(prefix(name, "p98"), statistics.getQuantile(0.98));
send(prefix(name, "p99"), statistics.getQuantile(0.99));
send(prefix(name, "p999"), statistics.getQuantile(0.999));
}
}
}

private void reportMeter(final String name, final Meter meter) {
if (meter != null) {
send(prefix(name, "rate"), String.valueOf(meter.getRate()));
send(prefix(name, "count"), String.valueOf(meter.getCount()));
send(prefix(name, "rate"), meter.getRate());
send(prefix(name, "count"), meter.getCount());
}
}

Expand All @@ -179,6 +185,23 @@ private String prefix(String ... names) {
}
}

private void send(String name, double value) {
send(numberIsNegative(value), name, String.valueOf(value));
}

private void send(String name, long value) {
send(value < 0, name, String.valueOf(value));
}

private void send(boolean resetToZero, String name, String value) {
if (resetToZero) {
// negative values are interpreted as reductions instead of absolute values
// reset value to 0 before applying reduction as a workaround
send(name, "0");
}
send(name, value);
}

private void send(final String name, final String value) {
try {
String formatted = String.format("%s:%s|g", name, value);
Expand Down Expand Up @@ -216,4 +239,8 @@ public String filterCharacters(String input) {

return chars == null ? input : new String(chars, 0, pos);
}

private boolean numberIsNegative(Number input) {
return Double.compare(input.doubleValue(), 0) < 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,36 @@ public void testStatsDHistogramReporting() throws Exception {
testMetricAndAssert(new TestHistogram(), "metric", expectedLines);
}

@Test
public void testStatsDHistogramReportingOfNegativeValues() throws Exception {
TestHistogram histogram = new TestHistogram();
histogram.setCount(-101);
histogram.setMean(-104);
histogram.setMin(-107);
histogram.setMax(-106);
histogram.setStdDev(-105);

Set<String> expectedLines = new HashSet<>();
expectedLines.add("metric.count:0|g");
expectedLines.add("metric.count:-101|g");
expectedLines.add("metric.mean:0|g");
expectedLines.add("metric.mean:-104.0|g");
expectedLines.add("metric.min:0|g");
expectedLines.add("metric.min:-107|g");
expectedLines.add("metric.max:0|g");
expectedLines.add("metric.max:-106|g");
expectedLines.add("metric.stddev:0|g");
expectedLines.add("metric.stddev:-105.0|g");
expectedLines.add("metric.p75:0.75|g");
expectedLines.add("metric.p98:0.98|g");
expectedLines.add("metric.p99:0.99|g");
expectedLines.add("metric.p999:0.999|g");
expectedLines.add("metric.p95:0.95|g");
expectedLines.add("metric.p50:0.5|g");

testMetricAndAssert(histogram, "metric", expectedLines);
}

/**
* Tests that meters are properly reported via the StatsD reporter.
*/
Expand All @@ -164,6 +194,17 @@ public void testStatsDMetersReporting() throws Exception {
testMetricAndAssert(new TestMeter(), "metric", expectedLines);
}

@Test
public void testStatsDMetersReportingOfNegativeValues() throws Exception {
Set<String> expectedLines = new HashSet<>();
expectedLines.add("metric.rate:0|g");
expectedLines.add("metric.rate:-5.3|g");
expectedLines.add("metric.count:0|g");
expectedLines.add("metric.count:-50|g");

testMetricAndAssert(new TestMeter(-50, -5.3), "metric", expectedLines);
}

/**
* Tests that counter are properly reported via the StatsD reporter.
*/
Expand All @@ -175,6 +216,15 @@ public void testStatsDCountersReporting() throws Exception {
testMetricAndAssert(new TestCounter(100), "metric", expectedLines);
}

@Test
public void testStatsDCountersReportingOfNegativeValues() throws Exception {
Set<String> expectedLines = new HashSet<>();
expectedLines.add("metric:0|g");
expectedLines.add("metric:-51|g");

testMetricAndAssert(new TestCounter(-51), "metric", expectedLines);
}

@Test
public void testStatsDGaugesReporting() throws Exception {
Set<String> expectedLines = new HashSet<>(2);
Expand All @@ -183,6 +233,15 @@ public void testStatsDGaugesReporting() throws Exception {
testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines);
}

@Test
public void testStatsDGaugesReportingOfNegativeValues() throws Exception {
Set<String> expectedLines = new HashSet<>();
expectedLines.add("metric:0|g");
expectedLines.add("metric:-12345|g");

testMetricAndAssert((Gauge) () -> -12345, "metric", expectedLines);
}

private void testMetricAndAssert(Metric metric, String metricName, Set<String> expectation) throws Exception {
StatsDReporter reporter = null;
DatagramSocketReceiver receiver = null;
Expand Down

0 comments on commit 92b1a66

Please sign in to comment.