Skip to content

Commit

Permalink
[FLINK-10242][metrics] Disable latency metrics by default
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 7, 2018
1 parent 78141f7 commit b0522e3
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 37 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/metric_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
<td style="word-wrap: break-word;">128</td>
<td>Defines the number of measured latencies to maintain at each operator.</td>
</tr>
<tr>
<td><h5>metrics.latency.interval</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.</td>
</tr>
<tr>
<td><h5>metrics.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
8 changes: 6 additions & 2 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the startup.

## Latency tracking

Flink allows to track the latency of records traveling through the system. To enable the latency tracking
a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`.
Flink allows to track the latency of records traveling through the system. This feature is disabled by default.
To enable the latency tracking you must set the `latencyTrackingInterval` to a positive number in either the
[Flink configuration]({{ site.baseurl }}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`.

At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`.
The marker contains a timestamp from the time when the record has been emitted at the sources.
Expand All @@ -1659,6 +1660,9 @@ latency issues caused by individual machines.
Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting
up an automated clock synchronisation service (like NTP) to avoid false latency results.

<span class="label label-danger">Warning</span> Enabling latency metrics can significantly impact the performance
of the cluster. It is highly recommended to only use them for debugging purposes.

## REST API integration

Metrics can be queried through the [Monitoring REST API]({{ site.baseurl }}/monitoring/rest_api.html).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -131,7 +132,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
/**
* Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
*/
private long latencyTrackingInterval = 2000L;
private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();

private boolean isLatencyTrackingConfigured = false;

/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
Expand Down Expand Up @@ -234,15 +237,14 @@ public long getAutoWatermarkInterval() {
* Interval for sending latency tracking marks from the sources to the sinks.
* Flink will send latency tracking marks from the sources at the specified interval.
*
* Recommended value: 2000 (2 seconds).
*
* Setting a tracking interval <= 0 disables the latency tracking.
*
* @param interval Interval in milliseconds.
*/
@PublicEvolving
public ExecutionConfig setLatencyTrackingInterval(long interval) {
this.latencyTrackingInterval = interval;
this.isLatencyTrackingConfigured = true;
return this;
}

Expand All @@ -256,12 +258,17 @@ public long getLatencyTrackingInterval() {
}

/**
* Returns if latency tracking is enabled
* @return True, if the tracking is enabled, false otherwise.
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public boolean isLatencyTrackingEnabled() {
return latencyTrackingInterval > 0;
return isLatencyTrackingConfigured && latencyTrackingInterval > 0;
}

@Internal
public boolean isLatencyTrackingConfigured() {
return isLatencyTrackingConfigured;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.description.Description;

import static org.apache.flink.configuration.ConfigOptions.key;

Expand Down Expand Up @@ -104,6 +105,13 @@ public class MetricOptions {
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>")
.withDescription("Defines the scope format string that is applied to all metrics scoped to an operator.");

public static final ConfigOption<Long> LATENCY_INTERVAL =
key("metrics.latency.interval")
.defaultValue(0L)
.withDescription("Defines the interval at which latency tracking marks are emitted from the sources." +
" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
" impact the performance of the cluster.");

/** The number of measured latencies to maintain at each operator. */
public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
key("metrics.latency.history-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -62,12 +64,17 @@ public void run(final Object lockingObject,

final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

LatencyMarksEmitter latencyEmitter = null;
if (getExecutionConfig().isLatencyTrackingEnabled()) {
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
getExecutionConfig().getLatencyTrackingInterval(),
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand All @@ -38,6 +40,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -55,20 +58,104 @@
import static org.mockito.Mockito.when;

/**
* Tests for {@link StreamSource} operators.
* Tests for the emission of latency markers by {@link StreamSource} operators.
*/
@SuppressWarnings("serial")
public class StreamSourceOperatorLatencyMetricsTest {
public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {

private static final long maxProcessingTime = 100L;
private static final long latencyMarkInterval = 10L;

/**
* Test that latency marks are emitted.
* Verifies that by default no latency metrics are emitted.
*/
@Test
public void testLatencyMarkEmission() throws Exception {
final List<StreamElement> output = new ArrayList<>();
public void testLatencyMarkEmissionDisabled() throws Exception {
testLatencyMarkEmission(0, (operator, timeProvider) -> {
setupSourceOperator(operator, new ExecutionConfig(), MockEnvironment.builder().build(), timeProvider);
});
}

/**
* Verifies that latency metrics can be enabled via the {@link ExecutionConfig}.
*/
@Test
public void testLatencyMarkEmissionEnabledViaExecutionConfig() throws Exception {
testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);

setupSourceOperator(operator, executionConfig, MockEnvironment.builder().build(), timeProvider);
});
}

/**
* Verifies that latency metrics can be enabled via the configuration.
*/
@Test
public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);

Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();

setupSourceOperator(operator, new ExecutionConfig(), env, timeProvider);
});
}

/**
* Verifies that latency metrics can be enabled via the {@link ExecutionConfig} even if they are disabled via
* the configuration.
*/
@Test
public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);

Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);

Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();

setupSourceOperator(operator, executionConfig, env, timeProvider);
});
}

/**
* Verifies that latency metrics can be disabled via the {@link ExecutionConfig} even if they are enabled via
* the configuration.
*/
@Test
public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
testLatencyMarkEmission(0, (operator, timeProvider) -> {
Configuration tmConfig = new Configuration();
tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);

Environment env = MockEnvironment.builder()
.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
.build();

final long maxProcessingTime = 100L;
final long latencyMarkInterval = 10L;
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(0);

setupSourceOperator(operator, executionConfig, env, timeProvider);
});
}

private interface OperatorSetupOperation {
void setupSourceOperator(
StreamSource<Long, ?> operator,
TestProcessingTimeService testProcessingTimeService
);
}

private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOperation operatorSetup) throws Exception {
final List<StreamElement> output = new ArrayList<>();

final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
testProcessingTimeService.setCurrentTime(0L);
Expand All @@ -78,23 +165,20 @@ public void testLatencyMarkEmission() throws Exception {
final StreamSource<Long, ProcessingTimeServiceSource> operator =
new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));

// emit latency marks every 10 milliseconds.
setupSourceOperator(operator, TimeCharacteristic.EventTime, latencyMarkInterval, testProcessingTimeService);
operatorSetup.setupSourceOperator(operator, testProcessingTimeService);

// run and wait to be stopped
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));

int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;

assertEquals(
numberLatencyMarkers + 1, // + 1 is the final watermark element
output.size());

long timestamp = 0L;

int i = 0;
// and that its only latency markers + a final watermark
for (; i < output.size() - 1; i++) {
// verify that its only latency markers + a final watermark
for (; i < numberLatencyMarkers; i++) {
StreamElement se = output.get(i);
Assert.assertTrue(se.isLatencyMarker());
Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId());
Expand All @@ -109,23 +193,18 @@ public void testLatencyMarkEmission() throws Exception {

// ------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long latencyMarkInterval,
final ProcessingTimeService timeProvider) {

ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
ExecutionConfig executionConfig,
Environment env,
ProcessingTimeService timeProvider) {

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStateBackend(new MemoryStateBackend());

cfg.setTimeCharacteristic(timeChar);
cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
cfg.setOperatorID(new OperatorID());

Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);

StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);

Expand Down

0 comments on commit b0522e3

Please sign in to comment.