Skip to content

Commit

Permalink
[FLINK-10243][metrics] Make latency metrics granularity configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 7, 2018
1 parent f9f80a2 commit 91f8fe8
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/metric_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>metrics.latency.granularity</h5></td>
<td style="word-wrap: break-word;">"operator"</td>
<td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td>
</tr>
<tr>
<td><h5>metrics.latency.history-size</h5></td>
<td style="word-wrap: break-word;">128</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;

/**
* Configuration options for metrics and metric reporters.
Expand Down Expand Up @@ -111,6 +112,17 @@ public class MetricOptions {
" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
" impact the performance of the cluster.");

public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
key("metrics.latency.granularity")
.defaultValue("operator")
.withDescription(Description.builder()
.text("Defines the granularity of latency metrics. Accepted values are:")
.list(
text("single - Track latency without differentiating between sources and subtasks."),
text("operator - Track latency while differentiating between sources, but not subtasks."),
text("subtask - Track latency while differentiating between sources and subtasks."))
.build());

/** The number of measured latencies to maintain at each operator. */
public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
key("metrics.latency.history-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

import java.io.Closeable;
import java.io.Serializable;
import java.util.Locale;

/**
* Base class for all stream operators. Operators that contain a user function should extend the class
Expand Down Expand Up @@ -193,11 +194,33 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}

final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
LatencyStats.Granularity granularity;
try {
granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException iae) {
granularity = LatencyStats.Granularity.OPERATOR;
LOG.warn(
"Configured value {} option for {} is invalid. Defaulting to {}.",
configuredGranularity,
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID());
this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
historySize,
container.getIndexInSubtaskGroup(),
getOperatorID(),
granularity);
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID());
this.latencyStats = new LatencyStats(
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
1,
0,
new OperatorID(),
LatencyStats.Granularity.SINGLE);
}

this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,29 @@ public class LatencyStats {
private final int historySize;
private final int subtaskIndex;
private final OperatorID operatorId;
private final Granularity granularity;

public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) {
public LatencyStats(
MetricGroup metricGroup,
int historySize,
int subtaskIndex,
OperatorID operatorID,
Granularity granularity) {
this.metricGroup = metricGroup;
this.historySize = historySize;
this.subtaskIndex = subtaskIndex;
this.operatorId = operatorID;
this.granularity = granularity;
}

public void reportLatency(LatencyMarker marker) {
String uniqueName = "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex;
final String uniqueName = granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);

DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
if (latencyHistogram == null) {
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);
this.metricGroup
.addGroup("source_id", String.valueOf(marker.getOperatorId()))
.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()))
granularity.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
.addGroup("operator_id", String.valueOf(operatorId))
.addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
.histogram("latency", latencyHistogram);
Expand All @@ -59,4 +65,62 @@ public void reportLatency(LatencyMarker marker) {
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime());
}

/**
* Granularity for latency metrics.
*/
public enum Granularity {
SINGLE {
@Override
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
return String.valueOf(operatorId) + operatorSubtaskIndex;
}

@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base;
}
},
OPERATOR {
@Override
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
}

@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
},
SUBTASK {
@Override
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
}

@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()))
.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
};

abstract String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);

abstract MetricGroup createSourceMetricGroups(MetricGroup base, LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Test;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Tests for the {@link LatencyStats}.
*/
public class LatencyStatsTest extends TestLogger {

private static final OperatorID OPERATOR_ID = new OperatorID();
private static final OperatorID SOURCE_ID_1 = new OperatorID();
private static final OperatorID SOURCE_ID_2 = new OperatorID();

private static final int OPERATOR_SUBTASK_INDEX = 64;

private static final String PARENT_GROUP_NAME = "parent";

@Test
public void testLatencyStatsSingle() {
testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> {
Assert.assertEquals(1, registrations.size());

{
final Tuple2<String, Histogram> registration = registrations.get(0);
assertName(registration.f0);
Assert.assertEquals(5, registration.f1.getCount());
}
});
}

@Test
public void testLatencyStatsOperator() {
testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> {
Assert.assertEquals(2, registrations.size());

{
final Tuple2<String, Histogram> registration = registrations.get(0);
assertName(registration.f0, SOURCE_ID_1);
Assert.assertEquals(3, registration.f1.getCount());
}

{
final Tuple2<String, Histogram> registration = registrations.get(1);
assertName(registration.f0, SOURCE_ID_2);
Assert.assertEquals(2, registration.f1.getCount());
}
});
}

@Test
public void testLatencyStatsSubtask() {
testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> {
Assert.assertEquals(4, registrations.size());

{
final Tuple2<String, Histogram> registration = registrations.get(0);
assertName(registration.f0, SOURCE_ID_1, 0);
Assert.assertEquals(2, registration.f1.getCount());
}

{
final Tuple2<String, Histogram> registration = registrations.get(1);
assertName(registration.f0, SOURCE_ID_1, 1);
Assert.assertEquals(1, registration.f1.getCount());
}

{
final Tuple2<String, Histogram> registration = registrations.get(2);
assertName(registration.f0, SOURCE_ID_2, 2);
Assert.assertEquals(1, registration.f1.getCount());
}

{
final Tuple2<String, Histogram> registration = registrations.get(3);
assertName(registration.f0, SOURCE_ID_2, 3);
Assert.assertEquals(1, registration.f1.getCount());
}
});
}

private static void testLatencyStats(
final LatencyStats.Granularity granularity,
final Consumer<List<Tuple2<String, Histogram>>> verifier) {

final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
final TestMetricRegistry registry = new TestMetricRegistry();
final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);

final LatencyStats latencyStats = new LatencyStats(
parentGroup,
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
OPERATOR_SUBTASK_INDEX,
OPERATOR_ID,
granularity);

latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));

verifier.accept(registry.latencyHistograms);
}

/**
* Removes all parts from the metric identifier preceding the latency-related parts.
*/
private static String sanitizeName(final String registrationName) {
return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
}

private static void assertName(final String registrationName) {
final String sanitizedName = sanitizeName(registrationName);
Assert.assertEquals("operator_id." + OPERATOR_ID +
".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
".latency", sanitizedName);
}

private static void assertName(final String registrationName, final OperatorID sourceId) {
final String sanitizedName = sanitizeName(registrationName);
Assert.assertEquals("source_id." + sourceId +
".operator_id." + OPERATOR_ID +
".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
".latency", sanitizedName);
}

private static void assertName(final String registrationName, final OperatorID sourceId, final int sourceIndex) {
final String sanitizedName = sanitizeName(registrationName);
Assert.assertEquals("source_id." + sourceId +
".source_subtask_index." + sourceIndex +
".operator_id." + OPERATOR_ID +
".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
".latency", sanitizedName);
}

private static class TestMetricRegistry implements MetricRegistry {

private final List<Tuple2<String, Histogram>> latencyHistograms = new ArrayList<>(4);

@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
if (metric instanceof Histogram) {
latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), (Histogram) metric));
}
}

@Override
public char getDelimiter() {
return '.';
}

@Override
public char getDelimiter(int index) {
return 0;
}

@Override
public int getNumberReporters() {
return 0;
}

@Override
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {

}

@Override
public ScopeFormats getScopeFormats() {
return null;
}

@Nullable
@Override
public String getMetricQueryServicePath() {
return null;
}
}
}

0 comments on commit 91f8fe8

Please sign in to comment.