diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 0bcab55704bcf..7963e1a015468 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -41,10 +41,12 @@ Flink supports `Counters`, `Gauges`, `Histograms` and `Meters`. A `Counter` is used to count something. The current value can be in- or decremented using `inc()/inc(long n)` or `dec()/dec(long n)`. You can create and register a `Counter` by calling `counter(String name)` on a `MetricGroup`. +
+
{% highlight java %} public class MyMapper extends RichMapFunction { - private Counter counter; + private transient Counter counter; @Override public void open(Configuration config) { @@ -61,13 +63,39 @@ public class MyMapper extends RichMapFunction { } {% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[String,String] { + @transient private var counter: Counter + + override def open(parameters: Configuration): Unit = { + counter = getRuntimeContext() + .getMetricGroup() + .counter("myCounter") + } + + override def map(value: String): String = { + counter.inc() + value + } +} + +{% endhighlight %} +
+ +
Alternatively you can also use your own `Counter` implementation: +
+
{% highlight java %} public class MyMapper extends RichMapFunction { - private Counter counter; + private transient Counter counter; @Override public void open(Configuration config) { @@ -83,7 +111,32 @@ public class MyMapper extends RichMapFunction { } } + +{% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[String,String] { + @transient private var counter: Counter + + override def open(parameters: Configuration): Unit = { + counter = getRuntimeContext() + .getMetricGroup() + .counter("myCustomCounter", new CustomCounter()) + } + + override def map(value: String): String = { + counter.inc() + value + } +} + {% endhighlight %} +
+ +
#### Gauge @@ -96,7 +149,7 @@ You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `Metr {% highlight java %} public class MyMapper extends RichMapFunction { - private int valueToExpose = 0; + private transient int valueToExpose = 0; @Override public void open(Configuration config) { @@ -123,8 +176,8 @@ public class MyMapper extends RichMapFunction {
{% highlight scala %} -public class MyMapper extends RichMapFunction[String,String] { - val valueToExpose = 0 +new class MyMapper extends RichMapFunction[String,String] { + @transient private var valueToExpose = 0 override def open(parameters: Configuration): Unit = { getRuntimeContext() @@ -150,9 +203,11 @@ Note that reporters will turn the exposed object into a `String`, which means th A `Histogram` measures the distribution of long values. You can register one by calling `histogram(String name, Histogram histogram)` on a `MetricGroup`. +
+
{% highlight java %} public class MyMapper extends RichMapFunction { - private Histogram histogram; + private transient Histogram histogram; @Override public void open(Configuration config) { @@ -168,6 +223,30 @@ public class MyMapper extends RichMapFunction { } } {% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[Long,Long] { + @transient private var histogram: Histogram + + override def open(parameters: Configuration): Unit = { + histogram = getRuntimeContext() + .getMetricGroup() + .histogram("myHistogram", new MyHistogram()) + } + + override def map(value: Long): Long = { + histogram.update(value) + value + } +} + +{% endhighlight %} +
+ +
Flink does not provide a default implementation for `Histogram`, but offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard histograms. To use this wrapper add the following dependency in your `pom.xml`: @@ -181,30 +260,67 @@ To use this wrapper add the following dependency in your `pom.xml`: You can then register a Codahale/DropWizard histogram like this: +
+
{% highlight java %} -public class MyMapper extends RichMapFunction { - private Histogram histogram; +public class MyMapper extends RichMapFunction { + private transient Histogram histogram; @Override public void open(Configuration config) { - com.codahale.metrics.Histogram histogram = + com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); this.histogram = getRuntimeContext() .getMetricGroup() - .histogram("myHistogram", new DropwizardHistogramWrapper(histogram)); + .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)); + } + + @Override + public Long map(Long value) throws Exception { + this.histogram.update(value); + return value; + } +} +{% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[Long, Long] { + @transient private var histogram: Histogram + + override def open(config: Configuration): Unit = { + com.codahale.metrics.Histogram dropwizardHistogram = + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)) + + histogram = getRuntimeContext() + .getMetricGroup() + .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram)) + } + + override def map(value: Long): Long = { + histogram.update(value) + value } } + {% endhighlight %} +
+ +
#### Meter A `Meter` measures an average throughput. An occurrence of an event can be registered with the `markEvent()` method. Occurrence of multiple events at the same time can be registered with `markEvent(long n)` method. You can register a meter by calling `meter(String name, Meter meter)` on a `MetricGroup`. +
+
{% highlight java %} public class MyMapper extends RichMapFunction { - private Meter meter; + private transient Meter meter; @Override public void open(Configuration config) { @@ -220,6 +336,30 @@ public class MyMapper extends RichMapFunction { } } {% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[Long,Long] { + @transient private var meter: Meter + + override def open(config: Configuration): Unit = { + meter = getRuntimeContext() + .getMetricGroup() + .meter("myMeter", new MyMeter()) + } + + override def map(value: Long): Long = { + meter.markEvent() + value + } +} + +{% endhighlight %} +
+ +
Flink offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard meters. To use this wrapper add the following dependency in your `pom.xml`: @@ -233,17 +373,19 @@ To use this wrapper add the following dependency in your `pom.xml`: You can then register a Codahale/DropWizard meter like this: +
+
{% highlight java %} public class MyMapper extends RichMapFunction { - private Meter meter; + private transient Meter meter; @Override public void open(Configuration config) { - com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter(); + com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() .getMetricGroup() - .meter("myMeter", new DropwizardMeterWrapper(meter)); + .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)); } @Override @@ -253,6 +395,32 @@ public class MyMapper extends RichMapFunction { } } {% endhighlight %} +
+ +
+{% highlight scala %} + +class MyMapper extends RichMapFunction[Long,Long] { + @transient private var meter: Meter + + override def open(config: Configuration): Unit = { + com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter() + + meter = getRuntimeContext() + .getMetricGroup() + .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter)) + } + + override def map(value: Long): Long = { + meter.markEvent() + value + } +} + +{% endhighlight %} +
+ +
## Scope @@ -265,6 +433,8 @@ You can configure which delimiter to use for the identifier (default: `.`) by se You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`. +
+
{% highlight java %} counter = getRuntimeContext() @@ -273,6 +443,20 @@ counter = getRuntimeContext() .counter("myCounter"); {% endhighlight %} +
+ +
+{% highlight scala %} + +counter = getRuntimeContext() + .getMetricGroup() + .addGroup("MyMetrics") + .counter("myCounter") + +{% endhighlight %} +
+ +
### System Scope