Skip to content

Commit

Permalink
[FLINK-3950] Add Meter interface
Browse files Browse the repository at this point in the history
This closes apache#2374
  • Loading branch information
mushketyk authored and zentol committed Aug 29, 2016
1 parent 6226108 commit 8195001
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 10 deletions.
51 changes: 50 additions & 1 deletion docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This method returns a `MetricGroup` object on which you can create and register

### Metric types

Flink supports `Counters`, `Gauges` and `Histograms`.
Flink supports `Counters`, `Gauges`, `Histograms` and `Meters`.

#### Counter

Expand Down Expand Up @@ -155,6 +155,55 @@ public class MyMapper extends RichMapFunction<Long, Integer> {
}
{% endhighlight %}

#### Meter

A `Meter` measures an average throughput. An occurrence of an event can be registered with the `markEvent()` method. Occurrence of multiple events at the same time can be registered with `markEvent(long n)` method.
You can register a meter by calling `meter(String name, Meter meter)` on a `MetricGroup`.

{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;

@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}

@public Integer map(Long value) throws Exception {
this.meter.markEvent();
}
}
{% endhighlight %}

Flink offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your `pom.xml`:
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>{{site.version}}</version>
</dependency>
{% endhighlight %}

You can then register a Codahale/DropWizard meter like this:

{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;

@Override
public void open(Configuration config) {
com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();

this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropWizardMeterWrapper(meter));
}
}
{% endhighlight %}

## Scope

Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics;

/**
* Metric for measuring throughput.
*/
public interface Meter extends Metric {

/**
* Mark occurrence of an event.
*/
void markEvent();

/**
* Mark occurrence of multiple events.
*
* @param n number of events occurred
*/
void markEvent(long n);

/**
* Returns the current rate of events per second.
*
* @return current rate of events per second
*/
double getRate();

/**
* Get number of events marked on the meter.
*
* @return number of events marked on the meter
*/
long getCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ public interface MetricGroup {
*/
<H extends Histogram> H histogram(int name, H histogram);

/**
* Registers a new {@link Meter} with Flink.
*
* @param name name of the meter
* @param meter meter to register
* @param <M> meter type
* @return the registered meter
*/
<M extends Meter> M meter(String name, M meter);

/**
* Registers a new {@link Meter} with Flink.
*
* @param name name of the meter
* @param meter meter to register
* @param <M> meter type
* @return the registered meter
*/
<M extends Meter> M meter(int name, M meter);

// ------------------------------------------------------------------------
// Groups
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;

Expand Down Expand Up @@ -69,6 +70,16 @@ public <H extends Histogram> H histogram(int name, H histogram) {
return histogram;
}

@Override
public <M extends Meter> M meter(String name, M meter) {
return meter;
}

@Override
public <M extends Meter> M meter(int name, M meter) {
return meter;
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
return histogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
Expand All @@ -39,6 +40,7 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte
protected final Map<Gauge<?>, String> gauges = new HashMap<>();
protected final Map<Counter, String> counters = new HashMap<>();
protected final Map<Histogram, String> histograms = new HashMap<>();
protected final Map<Meter, String> meters = new HashMap<>();

@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
Expand All @@ -51,6 +53,8 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, name);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
Expand All @@ -67,6 +71,8 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.util;

import org.apache.flink.metrics.Meter;

public class TestMeter implements Meter {

@Override
public void markEvent() {
}

@Override
public void markEvent(long n) {
}

@Override
public double getRate() {
return 5;
}

@Override
public long getCount() {
return 100L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import com.codahale.metrics.ScheduledReporter;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -67,6 +70,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
private final Map<Gauge<?>, String> gauges = new HashMap<>();
private final Map<Counter, String> counters = new HashMap<>();
private final Map<Histogram, String> histograms = new HashMap<>();
private final Map<Meter, String> meters = new HashMap<>();

// ------------------------------------------------------------------------

Expand All @@ -83,6 +87,10 @@ Map<Counter, String> getCounters() {
return counters;
}

Map<Meter, String> getMeters() {
return meters;
}

// ------------------------------------------------------------------------
// life cycle
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -118,10 +126,19 @@ else if (metric instanceof Gauge) {
histograms.put(histogram, fullName);

if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);

if (meter instanceof DropwizardMeterWrapper) {
registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@
*/
public class DropwizardHistogramWrapper implements Histogram {

private final com.codahale.metrics.Histogram dropwizarHistogram;
private final com.codahale.metrics.Histogram dropwizardHistogram;

public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) {
this.dropwizarHistogram = dropwizardHistogram;
this.dropwizardHistogram = dropwizardHistogram;
}

public com.codahale.metrics.Histogram getDropwizarHistogram() {
return dropwizarHistogram;
public com.codahale.metrics.Histogram getDropwizardHistogram() {
return dropwizardHistogram;
}

@Override
public void update(long value) {
dropwizarHistogram.update(value);
dropwizardHistogram.update(value);
}

@Override
public long getCount() {
return dropwizarHistogram.getCount();
return dropwizardHistogram.getCount();
}

@Override
public HistogramStatistics getStatistics() {
return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
return new DropwizardHistogramStatistics(dropwizardHistogram.getSnapshot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.dropwizard.metrics;

import org.apache.flink.metrics.Meter;

/**
* Wrapper to use a Dropwizard {@link com.codahale.metrics.Meter} as a Flink {@link Meter}.
*/
public class DropwizardMeterWrapper implements Meter {

private final com.codahale.metrics.Meter meter;

public DropwizardMeterWrapper(com.codahale.metrics.Meter meter) {
this.meter = meter;
}

public com.codahale.metrics.Meter getDropwizardMeter() {
return meter;
}

@Override
public void markEvent() {
meter.mark();
}

@Override
public void markEvent(long n) {
meter.mark(n);
}

@Override
public double getRate() {
return meter.getOneMinuteRate();
}

@Override
public long getCount() {
return meter.getCount();
}
}
Loading

0 comments on commit 8195001

Please sign in to comment.