From e40cb34868e2c6ff7548653e1e5e2dfbe7d47967 Mon Sep 17 00:00:00 2001 From: yew1eb Date: Sat, 14 Oct 2017 17:19:49 +0800 Subject: [PATCH] [FLINK-7608][metrics] Rework latency metric This closes #5161. --- .../DescriptiveStatisticsHistogram.java | 50 ++++++ ...criptiveStatisticsHistogramStatistics.java | 71 ++++++++ .../api/operators/AbstractStreamOperator.java | 151 ++---------------- .../streaming/api/operators/StreamSink.java | 4 +- .../streaming/api/operators/StreamSource.java | 7 +- .../runtime/streamrecord/LatencyMarker.java | 17 +- .../streamrecord/StreamElementSerializer.java | 8 +- .../flink/streaming/util/LatencyStats.java | 62 +++++++ .../operators/StreamSourceOperatorTest.java | 4 +- 9 files changed, 223 insertions(+), 151 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java new file mode 100644 index 0000000000000..f01b984be340b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +/** + * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. + */ +public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram { + + private final DescriptiveStatistics descriptiveStatistics; + + public DescriptiveStatisticsHistogram(int windowSize) { + this.descriptiveStatistics = new DescriptiveStatistics(windowSize); + } + + @Override + public void update(long value) { + this.descriptiveStatistics.addValue(value); + } + + @Override + public long getCount() { + return this.descriptiveStatistics.getN(); + } + + @Override + public HistogramStatistics getStatistics() { + return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java new file mode 100644 index 0000000000000..01d4c30a122fb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +import java.util.Arrays; + +/** + * DescriptiveStatistics histogram statistics implementation returned by {@link DescriptiveStatisticsHistogram}. + * The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly. + */ +public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistics { + private final DescriptiveStatistics descriptiveStatistics; + + public DescriptiveStatisticsHistogramStatistics(DescriptiveStatistics latencyHistogram) { + this.descriptiveStatistics = latencyHistogram; + } + + @Override + public double getQuantile(double quantile) { + return descriptiveStatistics.getPercentile(quantile * 100); + } + + @Override + public long[] getValues() { + return Arrays.stream(descriptiveStatistics.getValues()).mapToLong(i -> (long) i).toArray(); + } + + @Override + public int size() { + return (int) descriptiveStatistics.getN(); + } + + @Override + public double getMean() { + return descriptiveStatistics.getMean(); + } + + @Override + public double getStdDev() { + return descriptiveStatistics.getStandardDeviation(); + } + + @Override + public long getMax() { + return (long) descriptiveStatistics.getMax(); + } + + @Override + public long getMin() { + return (long) descriptiveStatistics.getMin(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 7db157caec2c1..42b6923dfe6a3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -31,12 +31,12 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -60,21 +60,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; import org.apache.flink.util.CloseableIterable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; -import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.Serializable; -import java.util.ConcurrentModificationException; -import java.util.HashMap; -import java.util.Map; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -150,7 +147,7 @@ public abstract class AbstractStreamOperator /** Metric group for the operator. */ protected transient OperatorMetricGroup metrics; - protected transient LatencyGauge latencyGauge; + protected transient LatencyStats latencyStats; // ---------------- time handler ------------------ @@ -188,14 +185,21 @@ public void setup(StreamTask containingTask, StreamConfig config, Output>> { - private final Map latencyStats = new HashMap<>(); - private final int historySize; - - LatencyGauge(int historySize) { - this.historySize = historySize; - } - - public void reportLatency(LatencyMarker marker, boolean isSink) { - LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink); - DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor); - if (sourceStats == null) { - // 512 element window (4 kb) - sourceStats = new DescriptiveStatistics(this.historySize); - latencyStats.put(sourceDescriptor, sourceStats); - } - long now = System.currentTimeMillis(); - sourceStats.addValue(now - marker.getMarkedTime()); - } - - @Override - public Map> getValue() { - while (true) { - try { - Map> ret = new HashMap<>(); - for (Map.Entry source : latencyStats.entrySet()) { - HashMap sourceStatistics = new HashMap<>(6); - sourceStatistics.put("max", source.getValue().getMax()); - sourceStatistics.put("mean", source.getValue().getMean()); - sourceStatistics.put("min", source.getValue().getMin()); - sourceStatistics.put("p50", source.getValue().getPercentile(50)); - sourceStatistics.put("p95", source.getValue().getPercentile(95)); - sourceStatistics.put("p99", source.getValue().getPercentile(99)); - ret.put(source.getKey().toString(), sourceStatistics); - } - return ret; - // Concurrent access onto the "latencyStats" map could cause - // ConcurrentModificationExceptions. To avoid unnecessary blocking - // of the reportLatency() method, we retry this operation until - // it succeeds. - } catch (ConcurrentModificationException ignore) { - LOG.debug("Unable to report latency statistics", ignore); - } - } - } - } - - /** - * Identifier for a latency source. - */ - private static class LatencySourceDescriptor { - /** - * A unique ID identifying a logical source in Flink. - */ - private final int vertexID; - - /** - * Identifier for parallel subtasks of a logical source. - */ - private final int subtaskIndex; - - /** - * Creates a {@code LatencySourceDescriptor} from a given {@code LatencyMarker}. - * - * @param marker The latency marker to extract the LatencySourceDescriptor from. - * @param ignoreSubtaskIndex Set to true to ignore the subtask index, to treat the latencies - * from all the parallel instances of a source as the same. - * @return A LatencySourceDescriptor for the given marker. - */ - public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) { - if (ignoreSubtaskIndex) { - return new LatencySourceDescriptor(marker.getVertexID(), -1); - } else { - return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex()); - } - - } - - private LatencySourceDescriptor(int vertexID, int subtaskIndex) { - this.vertexID = vertexID; - this.subtaskIndex = subtaskIndex; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - LatencySourceDescriptor that = (LatencySourceDescriptor) o; - - if (vertexID != that.vertexID) { - return false; - } - return subtaskIndex == that.subtaskIndex; - } - - @Override - public int hashCode() { - int result = vertexID; - result = 31 * result + subtaskIndex; - return result; - } - - @Override - public String toString() { - return "LatencySourceDescriptor{" + - "vertexID=" + vertexID + - ", subtaskIndex=" + subtaskIndex + - '}'; - } - } - /** * Wrapping {@link Output} that updates metrics on the number of emitted elements. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index 667e1307e7eb5..1c708769382d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -57,9 +57,9 @@ public void processElement(StreamRecord element) throws Exception { } @Override - protected void reportOrForwardLatencyMarker(LatencyMarker maker) { + protected void reportOrForwardLatencyMarker(LatencyMarker marker) { // all operators are tracking latencies - this.latencyGauge.reportLatency(maker, true); + this.latencyStats.reportLatency(marker); // sinks don't forward latency markers } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 744c90aa8d13a..5600d8f13cc69 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -67,7 +68,7 @@ public void run(final Object lockingObject, getProcessingTimeService(), collector, getExecutionConfig().getLatencyTrackingInterval(), - getOperatorConfig().getVertexID(), + this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } @@ -138,7 +139,7 @@ public LatencyMarksEmitter( final ProcessingTimeService processingTimeService, final Output> output, long latencyTrackingInterval, - final int vertexID, + final OperatorID operatorId, final int subtaskIndex) { latencyMarkTimer = processingTimeService.scheduleAtFixedRate( @@ -147,7 +148,7 @@ public LatencyMarksEmitter( public void onProcessingTime(long timestamp) throws Exception { try { // ProcessingTimeService callbacks are executed under the checkpointing lock - output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex)); + output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)); } catch (Throwable t) { // we catch the Throwables here so that we don't trigger the processing // timer services async exception handler diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java index 84af297680a48..932e1300d9c28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.runtime.jobgraph.OperatorID; /** * Special record type carrying a timestamp of its creation time at a source operator @@ -35,16 +36,16 @@ public final class LatencyMarker extends StreamElement { /** The time the latency mark is denoting. */ private final long markedTime; - private final int vertexID; + private final OperatorID operatorId; private final int subtaskIndex; /** * Creates a latency mark with the given timestamp. */ - public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) { + public LatencyMarker(long markedTime, OperatorID operatorId, int subtaskIndex) { this.markedTime = markedTime; - this.vertexID = vertexID; + this.operatorId = operatorId; this.subtaskIndex = subtaskIndex; } @@ -55,8 +56,8 @@ public long getMarkedTime() { return markedTime; } - public int getVertexID() { - return vertexID; + public OperatorID getOperatorId() { + return operatorId; } public int getSubtaskIndex() { @@ -79,7 +80,7 @@ public boolean equals(Object o) { if (markedTime != that.markedTime) { return false; } - if (vertexID != that.vertexID) { + if (operatorId != that.operatorId) { return false; } return subtaskIndex == that.subtaskIndex; @@ -89,7 +90,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = (int) (markedTime ^ (markedTime >>> 32)); - result = 31 * result + vertexID; + result = 31 * result + operatorId.hashCode(); result = 31 * result + subtaskIndex; return result; } @@ -98,7 +99,7 @@ public int hashCode() { public String toString() { return "LatencyMarker{" + "markedTime=" + markedTime + - ", vertexID=" + vertexID + + ", operatorId=" + operatorId + ", subtaskIndex=" + subtaskIndex + '}'; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index d0ab60a3ab347..ba92416d792ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; @@ -186,7 +187,8 @@ else if (value.isStreamStatus()) { else if (value.isLatencyMarker()) { target.write(TAG_LATENCY_MARKER); target.writeLong(value.asLatencyMarker().getMarkedTime()); - target.writeInt(value.asLatencyMarker().getVertexID()); + target.writeLong(value.asLatencyMarker().getOperatorId().getLowerPart()); + target.writeLong(value.asLatencyMarker().getOperatorId().getUpperPart()); target.writeInt(value.asLatencyMarker().getSubtaskIndex()); } else { @@ -211,7 +213,7 @@ else if (tag == TAG_STREAM_STATUS) { return new StreamStatus(source.readInt()); } else if (tag == TAG_LATENCY_MARKER) { - return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt()); } else { throw new IOException("Corrupt stream, found tag: " + tag); @@ -238,7 +240,7 @@ else if (tag == TAG_WATERMARK) { return new Watermark(source.readLong()); } else if (tag == TAG_LATENCY_MARKER) { - return new LatencyMarker(source.readLong(), source.readInt(), source.readInt()); + return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt()); } else { throw new IOException("Corrupt stream, found tag: " + tag); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java new file mode 100644 index 0000000000000..4f3d33ec6f921 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; + +import java.util.HashMap; +import java.util.Map; + +/** + * The {@link LatencyStats} objects are used to track and report on the behavior of latencies across measurements. + */ +public class LatencyStats { + private final Map latencyStats = new HashMap<>(); + private final MetricGroup metricGroup; + private final int historySize; + private final int subtaskIndex; + private final OperatorID operatorId; + + public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) { + this.metricGroup = metricGroup; + this.historySize = historySize; + this.subtaskIndex = subtaskIndex; + this.operatorId = operatorID; + } + + public void reportLatency(LatencyMarker marker) { + String uniqueName = "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex; + DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName); + if (latencyHistogram == null) { + latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize); + this.latencyStats.put(uniqueName, latencyHistogram); + this.metricGroup + .addGroup("source_id", String.valueOf(marker.getOperatorId())) + .addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex())) + .addGroup("operator_id", String.valueOf(operatorId)) + .addGroup("operator_subtask_index", String.valueOf(subtaskIndex)) + .histogram("latency", latencyHistogram); + } + + long now = System.currentTimeMillis(); + latencyHistogram.update(now - marker.getMarkedTime()); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index d9fcc12b4d5f3..cf09a6ebdb857 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -214,7 +215,7 @@ public void testLatencyMarkEmission() throws Exception { for (; i < output.size() - 1; i++) { StreamElement se = output.get(i); Assert.assertTrue(se.isLatencyMarker()); - Assert.assertEquals(-1, se.asLatencyMarker().getVertexID()); + Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId()); Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex()); Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp); @@ -290,6 +291,7 @@ private static void setupSourceOperator(StreamSource operator, cfg.setStateBackend(new MemoryStateBackend()); cfg.setTimeCharacteristic(timeChar); + cfg.setOperatorID(new OperatorID()); Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);