diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 3a148e1df7f96..0e514078fffae 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -34,7 +34,7 @@ This method returns a `MetricGroup` object on which you can create and register ### Metric types -Flink supports `Counters`, `Gauges` and `Histograms`. +Flink supports `Counters`, `Gauges`, `Histograms` and `Meters`. #### Counter @@ -155,6 +155,55 @@ public class MyMapper extends RichMapFunction { } {% endhighlight %} +#### Meter + +A `Meter` measures an average throughput. An occurrence of an event can be registered with the `markEvent()` method. Occurrence of multiple events at the same time can be registered with `markEvent(long n)` method. +You can register a meter by calling `meter(String name, Meter meter)` on a `MetricGroup`. + +{% highlight java %} +public class MyMapper extends RichMapFunction { + private Meter meter; + + @Override + public void open(Configuration config) { + this.meter = getRuntimeContext() + .getMetricGroup() + .meter("myMeter", new MyMeter()); + } + + @public Integer map(Long value) throws Exception { + this.meter.markEvent(); + } +} +{% endhighlight %} + +Flink offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard meters. +To use this wrapper add the following dependency in your `pom.xml`: +{% highlight xml %} + + org.apache.flink + flink-metrics-dropwizard + {{site.version}} + +{% endhighlight %} + +You can then register a Codahale/DropWizard meter like this: + +{% highlight java %} +public class MyMapper extends RichMapFunction { + private Meter meter; + + @Override + public void open(Configuration config) { + com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter(); + + this.meter = getRuntimeContext() + .getMetricGroup() + .meter("myMeter", new DropWizardMeterWrapper(meter)); + } +} +{% endhighlight %} + ## Scope Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java new file mode 100644 index 0000000000000..f9cf7420cf846 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics; + +/** + * Metric for measuring throughput. + */ +public interface Meter extends Metric { + + /** + * Mark occurrence of an event. + */ + void markEvent(); + + /** + * Mark occurrence of multiple events. + * + * @param n number of events occurred + */ + void markEvent(long n); + + /** + * Returns the current rate of events per second. + * + * @return current rate of events per second + */ + double getRate(); + + /** + * Get number of events marked on the meter. + * + * @return number of events marked on the meter + */ + long getCount(); +} diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index 95387ae5378ff..d4221ef76eb65 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -110,6 +110,26 @@ public interface MetricGroup { */ H histogram(int name, H histogram); + /** + * Registers a new {@link Meter} with Flink. + * + * @param name name of the meter + * @param meter meter to register + * @param meter type + * @return the registered meter + */ + M meter(String name, M meter); + + /** + * Registers a new {@link Meter} with Flink. + * + * @param name name of the meter + * @param meter meter to register + * @param meter type + * @return the registered meter + */ + M meter(int name, M meter); + // ------------------------------------------------------------------------ // Groups // ------------------------------------------------------------------------ diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java index 3bbd0f6c17c8a..ea11b43602741 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; @@ -69,6 +70,16 @@ public H histogram(int name, H histogram) { return histogram; } + @Override + public M meter(String name, M meter) { + return meter; + } + + @Override + public M meter(int name, M meter) { + return meter; + } + @Override public H histogram(String name, H histogram) { return histogram; diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java index 7ab8c73971498..0c8d9ada0efa6 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; @@ -39,6 +40,7 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte protected final Map, String> gauges = new HashMap<>(); protected final Map counters = new HashMap<>(); protected final Map histograms = new HashMap<>(); + protected final Map meters = new HashMap<>(); @Override public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { @@ -51,6 +53,8 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr gauges.put((Gauge) metric, name); } else if (metric instanceof Histogram) { histograms.put((Histogram) metric, name); + } else if (metric instanceof Meter) { + meters.put((Meter) metric, name); } else { log.warn("Cannot add unknown metric type {}. This indicates that the reporter " + "does not support this metric type.", metric.getClass().getName()); @@ -67,6 +71,8 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup gauges.remove(metric); } else if (metric instanceof Histogram) { histograms.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); } else { log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + "does not support this metric type.", metric.getClass().getName()); diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java new file mode 100644 index 0000000000000..b44b99688e838 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.util; + +import org.apache.flink.metrics.Meter; + +public class TestMeter implements Meter { + + @Override + public void markEvent() { + } + + @Override + public void markEvent(long n) { + } + + @Override + public double getRate() { + return 5; + } + + @Override + public long getCount() { + return 100L; + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java index ce0299b82b52a..b7e83b6af2817 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -23,14 +23,17 @@ import com.codahale.metrics.ScheduledReporter; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper; import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper; import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper; +import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper; import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; @@ -67,6 +70,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch private final Map, String> gauges = new HashMap<>(); private final Map counters = new HashMap<>(); private final Map histograms = new HashMap<>(); + private final Map meters = new HashMap<>(); // ------------------------------------------------------------------------ @@ -83,6 +87,10 @@ Map getCounters() { return counters; } + Map getMeters() { + return meters; + } + // ------------------------------------------------------------------------ // life cycle // ------------------------------------------------------------------------ @@ -118,10 +126,19 @@ else if (metric instanceof Gauge) { histograms.put(histogram, fullName); if (histogram instanceof DropwizardHistogramWrapper) { - registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram()); + registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram()); } else { registry.register(fullName, new FlinkHistogramWrapper(histogram)); } + } else if (metric instanceof Meter) { + Meter meter = (Meter) metric; + meters.put(meter, fullName); + + if (meter instanceof DropwizardMeterWrapper) { + registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter()); + } else { + registry.register(fullName, new FlinkMeterWrapper(meter)); + } } else { log.warn("Cannot add metric of type {}. This indicates that the reporter " + "does not support this metric type.", metric.getClass().getName()); diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java index 79a6a562245eb..25f77011ffbba 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java @@ -26,28 +26,28 @@ */ public class DropwizardHistogramWrapper implements Histogram { - private final com.codahale.metrics.Histogram dropwizarHistogram; + private final com.codahale.metrics.Histogram dropwizardHistogram; public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) { - this.dropwizarHistogram = dropwizardHistogram; + this.dropwizardHistogram = dropwizardHistogram; } - public com.codahale.metrics.Histogram getDropwizarHistogram() { - return dropwizarHistogram; + public com.codahale.metrics.Histogram getDropwizardHistogram() { + return dropwizardHistogram; } @Override public void update(long value) { - dropwizarHistogram.update(value); + dropwizardHistogram.update(value); } @Override public long getCount() { - return dropwizarHistogram.getCount(); + return dropwizardHistogram.getCount(); } @Override public HistogramStatistics getStatistics() { - return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot()); + return new DropwizardHistogramStatistics(dropwizardHistogram.getSnapshot()); } } diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java new file mode 100644 index 0000000000000..4f6fefe66c9c8 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Meter; + +/** + * Wrapper to use a Dropwizard {@link com.codahale.metrics.Meter} as a Flink {@link Meter}. + */ +public class DropwizardMeterWrapper implements Meter { + + private final com.codahale.metrics.Meter meter; + + public DropwizardMeterWrapper(com.codahale.metrics.Meter meter) { + this.meter = meter; + } + + public com.codahale.metrics.Meter getDropwizardMeter() { + return meter; + } + + @Override + public void markEvent() { + meter.mark(); + } + + @Override + public void markEvent(long n) { + meter.mark(n); + } + + @Override + public double getRate() { + return meter.getOneMinuteRate(); + } + + @Override + public long getCount() { + return meter.getCount(); + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java new file mode 100644 index 0000000000000..d0b84838f22b4 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import com.codahale.metrics.Clock; +import org.apache.flink.metrics.Meter; + +/** + * Wrapper to use a Flink {@link Meter} as a Dropwizard {@link com.codahale.metrics.Meter}. + * This is necessary to report Flink's meters via the Dropwizard + * {@link com.codahale.metrics.Reporter}. + */ +public class FlinkMeterWrapper extends com.codahale.metrics.Meter { + + private final Meter meter; + + public FlinkMeterWrapper(Meter meter) { + super(); + this.meter = meter; + } + + public FlinkMeterWrapper(Meter meter, Clock clock) { + super(clock); + this.meter = meter; + } + + @Override + public void mark() { + meter.markEvent(); + } + + @Override + public void mark(long n) { + meter.markEvent(n); + } + + @Override + public long getCount() { + return meter.getCount(); + } + + @Override + public double getOneMinuteRate() { + return meter.getRate(); + } + + @Override + public double getFiveMinuteRate() { + return 0; + } + + @Override + public double getFifteenMinuteRate() { + return 0; + } + + @Override + public double getMeanRate() { + return 0; + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 5979c43da0472..1440028ca28da 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.reporter.MetricReporter; @@ -85,8 +87,11 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new AbstractID(), new AbstractID(), taskName, 0, 0); SimpleCounter myCounter = new SimpleCounter(); + com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); + DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter); taskMetricGroup.counter(counterName, myCounter); + taskMetricGroup.meter("meter", meterWrapper); List reporters = metricRegistry.getReporters(); @@ -98,9 +103,11 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter; Map counters = reporter.getCounters(); - assertTrue(counters.containsKey(myCounter)); + Map meters = reporter.getMeters(); + assertTrue(meters.containsKey(meterWrapper)); + String expectedCounterName = reporter.filterCharacters(hostname) + delimiter + reporter.filterCharacters(taskManagerId) diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java new file mode 100644 index 0000000000000..0b8fa5204496d --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DropwizardMeterWrapperTest { + + @Test + public void testWrapper() { + com.codahale.metrics.Meter dropwizardMeter = mock(com.codahale.metrics.Meter.class); + when(dropwizardMeter.getOneMinuteRate()).thenReturn(1.0); + when(dropwizardMeter.getCount()).thenReturn(100L); + + DropwizardMeterWrapper wrapper = new DropwizardMeterWrapper(dropwizardMeter); + + assertEquals(1.0, wrapper.getRate(), 0.00001); + assertEquals(100L, wrapper.getCount()); + } + + @Test + public void testMarkEvent() { + com.codahale.metrics.Meter dropwizardMeter = mock(com.codahale.metrics.Meter.class); + DropwizardMeterWrapper wrapper = new DropwizardMeterWrapper(dropwizardMeter); + wrapper.markEvent(); + + verify(dropwizardMeter).mark(); + } + + @Test + public void testMarkEventN() { + com.codahale.metrics.Meter dropwizardMeter = mock(com.codahale.metrics.Meter.class); + DropwizardMeterWrapper wrapper = new DropwizardMeterWrapper(dropwizardMeter); + wrapper.markEvent(10L); + + verify(dropwizardMeter).mark(10L); + } +} diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java new file mode 100644 index 0000000000000..b6389c5d0ffb5 --- /dev/null +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.util.TestMeter; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class FlinkMeterWrapperTest { + + private static final double DELTA = 0.0001; + + @Test + public void testWrapper() { + Meter meter = new TestMeter(); + + FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter); + assertEquals(0, wrapper.getMeanRate(), DELTA); + assertEquals(5, wrapper.getOneMinuteRate(), DELTA); + assertEquals(0, wrapper.getFiveMinuteRate(), DELTA); + assertEquals(0, wrapper.getFifteenMinuteRate(), DELTA); + assertEquals(100L, wrapper.getCount()); + } + + @Test + public void testMarkOneEvent() { + Meter meter = mock(Meter.class); + + FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter); + wrapper.mark(); + + verify(meter).markEvent(); + } + + @Test + public void testMarkSeveralEvents() { + Meter meter = mock(Meter.class); + + FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter); + wrapper.mark(5); + + verify(meter).markEvent(5); + } +} diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java index 1a283d9244df2..39a5aa29bf6c1 100644 --- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java +++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java @@ -21,6 +21,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; @@ -160,6 +161,8 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr jmxMetric = new JmxCounter((Counter) metric); } else if (metric instanceof Histogram) { jmxMetric = new JmxHistogram((Histogram) metric); + } else if (metric instanceof Meter) { + jmxMetric = new JmxMeter((Meter) metric); } else { LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " + "is not supported by this reporter.", metric.getClass().getName()); @@ -417,6 +420,31 @@ public double get999thPercentile() { } } + public interface JmxMeterMBean extends MetricMBean { + double getRate(); + + long getCount(); + } + + private class JmxMeter extends AbstractBean implements JmxMeterMBean { + + private final Meter meter; + + public JmxMeter(Meter meter) { + this.meter = meter; + } + + @Override + public double getRate() { + return meter.getRate(); + } + + @Override + public long getCount() { + return meter.getCount(); + } + } + /** * JMX Server implementation that JMX clients can connect to. * diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 14ba5ec4abd4a..913999b4ca32c 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -245,6 +246,47 @@ public void testHistogramReporting() throws Exception { } } + /** + * Tests that meters are properly reported via the JMXReporter. + */ + @Test + public void testMeterReporting() throws Exception { + MetricRegistry registry = null; + String meterName = "meter"; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + + registry = new MetricRegistry(config); + + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + + TestMeter meter = new TestMeter(); + + metricGroup.meter(meterName, meter); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(meterName, metricGroup.getScopeComponents())); + + MBeanInfo info = mBeanServer.getMBeanInfo(objectName); + + MBeanAttributeInfo[] attributeInfos = info.getAttributes(); + + assertEquals(2, attributeInfos.length); + + assertEquals(meter.getRate(), mBeanServer.getAttribute(objectName, "Rate")); + assertEquals(meter.getCount(), mBeanServer.getAttribute(objectName, "Count")); + + } finally { + if (registry != null) { + registry.shutdown(); + } + } + } + static class TestingHistogram implements Histogram { @Override diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index 354eff12ce6ba..977d1b4bc53fd 100644 --- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.AbstractReporter; import org.apache.flink.metrics.reporter.Scheduled; @@ -118,6 +119,10 @@ public void report() { for (Map.Entry entry : histograms.entrySet()) { reportHistogram(entry.getValue(), entry.getKey()); } + + for (Map.Entry entry : meters.entrySet()) { + reportMeter(entry.getValue(), entry.getKey()); + } } catch (ConcurrentModificationException | NoSuchElementException e) { // ignore - may happen when metrics are concurrently added or removed @@ -159,6 +164,13 @@ private void reportHistogram(final String name, final Histogram histogram) { } } + private void reportMeter(final String name, final Meter meter) { + if (meter != null) { + send(prefix(name, "rate"), String.valueOf(meter.getRate())); + send(prefix(name, "count"), String.valueOf(meter.getCount())); + } + } + private String prefix(String ... names) { if (names.length > 0) { StringBuilder stringBuilder = new StringBuilder(names[0]); diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 8c1af0e0b7c7d..ad982f1b1b43e 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.reporter.MetricReporter; @@ -194,6 +195,68 @@ public void testStatsDHistogramReporting() throws Exception { } } + /** + * Tests that meters are properly reported via the StatsD reporter + */ + @Test + public void testStatsDMetersReporting() throws Exception { + MetricRegistry registry = null; + DatagramSocketReceiver receiver = null; + Thread receiverThread = null; + long timeout = 5000; + long joinTimeout = 30000; + + String meterName = "meter"; + + try { + receiver = new DatagramSocketReceiver(); + + receiverThread = new Thread(receiver); + + receiverThread.start(); + + int port = receiver.getPort(); + + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port); + + registry = new MetricRegistry(config); + TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); + TestMeter meter = new TestMeter(); + metricGroup.meter(meterName, meter); + String prefix = metricGroup.getMetricIdentifier(meterName); + + Set expectedLines = new HashSet<>(); + + expectedLines.add(prefix + ".rate:5.0|g"); + expectedLines.add(prefix + ".count:100|g"); + + receiver.waitUntilNumLines(expectedLines.size(), timeout); + + Set lines = receiver.getLines(); + + + assertEquals(expectedLines, lines); + + } finally { + if (registry != null) { + registry.shutdown(); + } + + if (receiver != null) { + receiver.stop(); + } + + if (receiverThread != null) { + receiverThread.join(joinTimeout); + } + } + } + /** * Testing StatsDReporter which disables the socket creation */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index ef2bc0cac6870..89fe3cde069eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; @@ -231,6 +232,17 @@ public H histogram(String name, H histogram) { return histogram; } + @Override + public M meter(int name, M meter) { + return meter(String.valueOf(name), meter); + } + + @Override + public M meter(String name, M meter) { + addMetric(name, meter); + return meter; + } + /** * Adds the given metric to the group and registers it at the registry, if the group * is not yet closed, and if no metric with the same name has been registered before. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java index 633c4af547056..0ebf749c6efef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import java.util.Map; @@ -81,6 +82,16 @@ public final H histogram(int name, H histogram) { return parentMetricGroup.histogram(name, histogram); } + @Override + public M meter(String name, M meter) { + return parentMetricGroup.meter(name, meter); + } + + @Override + public M meter(int name, M meter) { + return parentMetricGroup.meter(name, meter); + } + @Override public final MetricGroup addGroup(int name) { return parentMetricGroup.addGroup(name);