Skip to content

Commit

Permalink
[FLINK-7907][docs][metrics] Add scala examples
Browse files Browse the repository at this point in the history
This closes apache#7907.
  • Loading branch information
yew1eb authored and zentol committed Dec 4, 2017
1 parent dbb77ed commit f4e4cd6
Showing 1 changed file with 198 additions and 14 deletions.
212 changes: 198 additions & 14 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ Flink supports `Counters`, `Gauges`, `Histograms` and `Meters`.
A `Counter` is used to count something. The current value can be in- or decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
You can create and register a `Counter` by calling `counter(String name)` on a `MetricGroup`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

public class MyMapper extends RichMapFunction<String, String> {
private Counter counter;
private transient Counter counter;

@Override
public void open(Configuration config) {
Expand All @@ -61,13 +63,39 @@ public class MyMapper extends RichMapFunction<String, String> {
}

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter

override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}

override def map(value: String): String = {
counter.inc()
value
}
}

{% endhighlight %}
</div>

</div>

Alternatively you can also use your own `Counter` implementation:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

public class MyMapper extends RichMapFunction<String, String> {
private Counter counter;
private transient Counter counter;

@Override
public void open(Configuration config) {
Expand All @@ -83,7 +111,32 @@ public class MyMapper extends RichMapFunction<String, String> {
}
}


{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter

override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter())
}

override def map(value: String): String = {
counter.inc()
value
}
}

{% endhighlight %}
</div>

</div>

#### Gauge

Expand All @@ -96,7 +149,7 @@ You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `Metr
{% highlight java %}

public class MyMapper extends RichMapFunction<String, String> {
private int valueToExpose = 0;
private transient int valueToExpose = 0;

@Override
public void open(Configuration config) {
Expand All @@ -123,8 +176,8 @@ public class MyMapper extends RichMapFunction<String, String> {
<div data-lang="scala" markdown="1">
{% highlight scala %}

public class MyMapper extends RichMapFunction[String,String] {
val valueToExpose = 0
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0

override def open(parameters: Configuration): Unit = {
getRuntimeContext()
Expand All @@ -150,9 +203,11 @@ Note that reporters will turn the exposed object into a `String`, which means th
A `Histogram` measures the distribution of long values.
You can register one by calling `histogram(String name, Histogram histogram)` on a `MetricGroup`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Long> {
private Histogram histogram;
private transient Histogram histogram;

@Override
public void open(Configuration config) {
Expand All @@ -168,6 +223,30 @@ public class MyMapper extends RichMapFunction<Long, Long> {
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[Long,Long] {
@transient private var histogram: Histogram

override def open(parameters: Configuration): Unit = {
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram())
}

override def map(value: Long): Long = {
histogram.update(value)
value
}
}

{% endhighlight %}
</div>

</div>

Flink does not provide a default implementation for `Histogram`, but offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
To use this wrapper add the following dependency in your `pom.xml`:
Expand All @@ -181,30 +260,67 @@ To use this wrapper add the following dependency in your `pom.xml`:

You can then register a Codahale/DropWizard histogram like this:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram histogram =
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}

@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram

override def open(config: Configuration): Unit = {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}

override def map(value: Long): Long = {
histogram.update(value)
value
}
}

{% endhighlight %}
</div>

</div>

#### Meter

A `Meter` measures an average throughput. An occurrence of an event can be registered with the `markEvent()` method. Occurrence of multiple events at the same time can be registered with `markEvent(long n)` method.
You can register a meter by calling `meter(String name, Meter meter)` on a `MetricGroup`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Long> {
private Meter meter;
private transient Meter meter;

@Override
public void open(Configuration config) {
Expand All @@ -220,6 +336,30 @@ public class MyMapper extends RichMapFunction<Long, Long> {
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter

override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}

override def map(value: Long): Long = {
meter.markEvent()
value
}
}

{% endhighlight %}
</div>

</div>

Flink offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your `pom.xml`:
Expand All @@ -233,17 +373,19 @@ To use this wrapper add the following dependency in your `pom.xml`:

You can then register a Codahale/DropWizard meter like this:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class MyMapper extends RichMapFunction<Long, Long> {
private Meter meter;
private transient Meter meter;

@Override
public void open(Configuration config) {
com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(meter));
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}

@Override
Expand All @@ -253,6 +395,32 @@ public class MyMapper extends RichMapFunction<Long, Long> {
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter

override def open(config: Configuration): Unit = {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()

meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}

override def map(value: Long): Long = {
meter.markEvent()
value
}
}

{% endhighlight %}
</div>

</div>

## Scope

Expand All @@ -265,6 +433,8 @@ You can configure which delimiter to use for the identifier (default: `.`) by se

You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

counter = getRuntimeContext()
Expand All @@ -273,6 +443,20 @@ counter = getRuntimeContext()
.counter("myCounter");

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter")

{% endhighlight %}
</div>

</div>

### System Scope

Expand Down

0 comments on commit f4e4cd6

Please sign in to comment.