Skip to content

Commit

Permalink
[FLINK-7608][metrics] Rework latency metric
Browse files Browse the repository at this point in the history
This closes apache#5161.
  • Loading branch information
yew1eb authored and zentol committed Feb 6, 2018
1 parent dbb81ac commit e40cb34
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.metrics;

import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

/**
* The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}.
*/
public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram {

private final DescriptiveStatistics descriptiveStatistics;

public DescriptiveStatisticsHistogram(int windowSize) {
this.descriptiveStatistics = new DescriptiveStatistics(windowSize);
}

@Override
public void update(long value) {
this.descriptiveStatistics.addValue(value);
}

@Override
public long getCount() {
return this.descriptiveStatistics.getN();
}

@Override
public HistogramStatistics getStatistics() {
return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.metrics;

import org.apache.flink.metrics.HistogramStatistics;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

import java.util.Arrays;

/**
* DescriptiveStatistics histogram statistics implementation returned by {@link DescriptiveStatisticsHistogram}.
* The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly.
*/
public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistics {
private final DescriptiveStatistics descriptiveStatistics;

public DescriptiveStatisticsHistogramStatistics(DescriptiveStatistics latencyHistogram) {
this.descriptiveStatistics = latencyHistogram;
}

@Override
public double getQuantile(double quantile) {
return descriptiveStatistics.getPercentile(quantile * 100);
}

@Override
public long[] getValues() {
return Arrays.stream(descriptiveStatistics.getValues()).mapToLong(i -> (long) i).toArray();
}

@Override
public int size() {
return (int) descriptiveStatistics.getN();
}

@Override
public double getMean() {
return descriptiveStatistics.getMean();
}

@Override
public double getStdDev() {
return descriptiveStatistics.getStandardDeviation();
}

@Override
public long getMax() {
return (long) descriptiveStatistics.getMax();
}

@Override
public long getMin() {
return (long) descriptiveStatistics.getMin();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
Expand All @@ -60,21 +60,18 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;

/**
* Base class for all stream operators. Operators that contain a user function should extend the class
Expand Down Expand Up @@ -150,7 +147,7 @@ public abstract class AbstractStreamOperator<OUT>
/** Metric group for the operator. */
protected transient OperatorMetricGroup metrics;

protected transient LatencyGauge latencyGauge;
protected transient LatencyStats latencyStats;

// ---------------- time handler ------------------

Expand Down Expand Up @@ -188,14 +185,21 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.output = output;
}
Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();

try {
Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID());
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID());
}

latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());

stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
Expand Down Expand Up @@ -642,135 +646,14 @@ public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception

protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyGauge.reportLatency(marker, false);
this.latencyStats.reportLatency(marker);

// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
}

// ----------------------- Helper classes -----------------------


/**
* The gauge uses a HashMap internally to avoid classloading issues when accessing
* the values using JMX.
*/
protected static class LatencyGauge implements Gauge<Map<String, HashMap<String, Double>>> {
private final Map<LatencySourceDescriptor, DescriptiveStatistics> latencyStats = new HashMap<>();
private final int historySize;

LatencyGauge(int historySize) {
this.historySize = historySize;
}

public void reportLatency(LatencyMarker marker, boolean isSink) {
LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink);
DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor);
if (sourceStats == null) {
// 512 element window (4 kb)
sourceStats = new DescriptiveStatistics(this.historySize);
latencyStats.put(sourceDescriptor, sourceStats);
}
long now = System.currentTimeMillis();
sourceStats.addValue(now - marker.getMarkedTime());
}

@Override
public Map<String, HashMap<String, Double>> getValue() {
while (true) {
try {
Map<String, HashMap<String, Double>> ret = new HashMap<>();
for (Map.Entry<LatencySourceDescriptor, DescriptiveStatistics> source : latencyStats.entrySet()) {
HashMap<String, Double> sourceStatistics = new HashMap<>(6);
sourceStatistics.put("max", source.getValue().getMax());
sourceStatistics.put("mean", source.getValue().getMean());
sourceStatistics.put("min", source.getValue().getMin());
sourceStatistics.put("p50", source.getValue().getPercentile(50));
sourceStatistics.put("p95", source.getValue().getPercentile(95));
sourceStatistics.put("p99", source.getValue().getPercentile(99));
ret.put(source.getKey().toString(), sourceStatistics);
}
return ret;
// Concurrent access onto the "latencyStats" map could cause
// ConcurrentModificationExceptions. To avoid unnecessary blocking
// of the reportLatency() method, we retry this operation until
// it succeeds.
} catch (ConcurrentModificationException ignore) {
LOG.debug("Unable to report latency statistics", ignore);
}
}
}
}

/**
* Identifier for a latency source.
*/
private static class LatencySourceDescriptor {
/**
* A unique ID identifying a logical source in Flink.
*/
private final int vertexID;

/**
* Identifier for parallel subtasks of a logical source.
*/
private final int subtaskIndex;

/**
* Creates a {@code LatencySourceDescriptor} from a given {@code LatencyMarker}.
*
* @param marker The latency marker to extract the LatencySourceDescriptor from.
* @param ignoreSubtaskIndex Set to true to ignore the subtask index, to treat the latencies
* from all the parallel instances of a source as the same.
* @return A LatencySourceDescriptor for the given marker.
*/
public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) {
if (ignoreSubtaskIndex) {
return new LatencySourceDescriptor(marker.getVertexID(), -1);
} else {
return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
}

}

private LatencySourceDescriptor(int vertexID, int subtaskIndex) {
this.vertexID = vertexID;
this.subtaskIndex = subtaskIndex;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

LatencySourceDescriptor that = (LatencySourceDescriptor) o;

if (vertexID != that.vertexID) {
return false;
}
return subtaskIndex == that.subtaskIndex;
}

@Override
public int hashCode() {
int result = vertexID;
result = 31 * result + subtaskIndex;
return result;
}

@Override
public String toString() {
return "LatencySourceDescriptor{" +
"vertexID=" + vertexID +
", subtaskIndex=" + subtaskIndex +
'}';
}
}

/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void processElement(StreamRecord<IN> element) throws Exception {
}

@Override
protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyGauge.reportLatency(maker, true);
this.latencyStats.reportLatency(marker);

// sinks don't forward latency markers
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void run(final Object lockingObject,
getProcessingTimeService(),
collector,
getExecutionConfig().getLatencyTrackingInterval(),
getOperatorConfig().getVertexID(),
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}

Expand Down Expand Up @@ -138,7 +139,7 @@ public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final int vertexID,
final OperatorID operatorId,
final int subtaskIndex) {

latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
Expand All @@ -147,7 +148,7 @@ public LatencyMarksEmitter(
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
Expand Down
Loading

0 comments on commit e40cb34

Please sign in to comment.