From f1b2e9bfc79ebcc93c4fb111d2b75db41b6e7051 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Wed, 17 Apr 2019 23:31:38 +0800 Subject: [PATCH] [hotfix][metrics] Remove legacy/unused code --- .../webmonitor/history/HistoryServerTest.java | 7 +- .../runtime/executiongraph/IOMetrics.java | 48 +---- .../metrics/groups/TaskIOMetricGroup.java | 12 -- .../rest/handler/util/MutableIOMetrics.java | 46 +---- .../ExecutionGraphDeploymentTest.java | 6 +- ...btaskCurrentAttemptDetailsHandlerTest.java | 7 +- ...askExecutionAttemptDetailsHandlerTest.java | 7 +- .../utils/ArchivedJobGenerationUtils.java | 167 ------------------ 8 files changed, 12 insertions(+), 288 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index fce2b6dcc31d0..2a05a880ac24f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -53,8 +54,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY; - /** * Tests for the HistoryServer. */ @@ -63,6 +62,10 @@ public class HistoryServerTest extends TestLogger { @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); + private static final JsonFactory JACKSON_FACTORY = new JsonFactory() + .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); + private MiniClusterWithClientResource cluster; private File jmDirectory; private File hsDirectory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index 806de0b751c85..b74ac1dd5656e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -32,28 +32,16 @@ public class IOMetrics implements Serializable { protected long numRecordsIn; protected long numRecordsOut; - protected double numRecordsInPerSecond; - protected double numRecordsOutPerSecond; - protected long numBytesInLocal; protected long numBytesInRemote; protected long numBytesOut; - protected double numBytesInLocalPerSecond; - protected double numBytesInRemotePerSecond; - protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { this.numRecordsIn = recordsIn.getCount(); - this.numRecordsInPerSecond = recordsIn.getRate(); this.numRecordsOut = recordsOut.getCount(); - this.numRecordsOutPerSecond = recordsOut.getRate(); this.numBytesInLocal = bytesLocalIn.getCount(); - this.numBytesInLocalPerSecond = bytesLocalIn.getRate(); this.numBytesInRemote = bytesRemoteIn.getCount(); - this.numBytesInRemotePerSecond = bytesRemoteIn.getRate(); this.numBytesOut = bytesOut.getCount(); - this.numBytesOutPerSecond = bytesOut.getRate(); } public IOMetrics( @@ -61,22 +49,12 @@ public IOMetrics( long numBytesInRemote, long numBytesOut, long numRecordsIn, - long numRecordsOut, - double numBytesInLocalPerSecond, - double numBytesInRemotePerSecond, - double numBytesOutPerSecond, - double numRecordsInPerSecond, - double numRecordsOutPerSecond) { + long numRecordsOut) { this.numBytesInLocal = numBytesInLocal; this.numBytesInRemote = numBytesInRemote; this.numBytesOut = numBytesOut; this.numRecordsIn = numRecordsIn; this.numRecordsOut = numRecordsOut; - this.numBytesInLocalPerSecond = numBytesInLocalPerSecond; - this.numBytesInRemotePerSecond = numBytesInRemotePerSecond; - this.numBytesOutPerSecond = numBytesOutPerSecond; - this.numRecordsInPerSecond = numRecordsInPerSecond; - this.numRecordsOutPerSecond = numRecordsOutPerSecond; } public long getNumRecordsIn() { @@ -95,31 +73,7 @@ public long getNumBytesInRemote() { return numBytesInRemote; } - public long getNumBytesInTotal() { - return numBytesInLocal + numBytesInRemote; - } - public long getNumBytesOut() { return numBytesOut; } - - public double getNumRecordsInPerSecond() { - return numRecordsInPerSecond; - } - - public double getNumRecordsOutPerSecond() { - return numRecordsOutPerSecond; - } - - public double getNumBytesInLocalPerSecond() { - return numBytesInLocalPerSecond; - } - - public double getNumBytesInRemotePerSecond() { - return numBytesInRemotePerSecond; - } - - public double getNumBytesOutPerSecond() { - return numBytesOutPerSecond; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 79c5e8f41c764..aa6a3324317d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -119,18 +119,6 @@ public Counter getNumBuffersInRemoteCounter() { return numBuffersInRemote; } - public Meter getNumBytesInLocalRateMeter() { - return numBytesInRateLocal; - } - - public Meter getNumBytesInRemoteRateMeter() { - return numBytesInRateRemote; - } - - public Meter getNumBytesOutRateMeter() { - return numBytesOutRate; - } - // ============================================================================================ // Buffer metrics // ============================================================================================ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 6822417747040..e678e479241f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -26,12 +26,8 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; - import javax.annotation.Nullable; -import java.io.IOException; - /** * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics. * @@ -51,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics { private boolean numRecordsOutComplete = true; public MutableIOMetrics() { - super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + super(0, 0, 0, 0, 0); } public boolean isNumBytesInLocalComplete() { @@ -151,44 +147,4 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche } } } - - /** - * Writes the IO metrics contained in this object to the given {@link JsonGenerator}. - * - *

The JSON structure written is as follows: - * "metrics": { - * "read-bytes": 1, - * "read-bytes-complete": true, - * "write-bytes": 2, - * "write-bytes-complete": true, - * "read-records": 3, - * "read-records-complete": true, - * "write-records": 4, - * "write-records-complete": true - * } - * - * @param gen JsonGenerator to which the metrics should be written - * @throws IOException - */ - public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { - /** - * As described in {@link addIOMetrics}, we want to distinguish incomplete values from 0. - * However, for API backward compatibility, incomplete metrics will still be represented by the 0 value and - * a boolean will indicate the completeness. - */ - - gen.writeObjectFieldStart("metrics"); - - Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote; - gen.writeNumberField("read-bytes", numBytesIn); - gen.writeBooleanField("read-bytes-complete", (this.numBytesInLocalComplete && this.numBytesInRemoteComplete)); - gen.writeNumberField("write-bytes", this.numBytesOut); - gen.writeBooleanField("write-bytes-complete", this.numBytesOutComplete); - gen.writeNumberField("read-records", this.numRecordsIn); - gen.writeBooleanField("read-records-complete", this.numRecordsInComplete); - gen.writeNumberField("write-records", this.numRecordsOut); - gen.writeBooleanField("write-records-complete", this.numRecordsOutComplete); - - gen.writeEndObject(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index cb594670c996f..cb68579a3340d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -336,7 +336,7 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception { // verify behavior for canceled executions Execution execution1 = graphAndExecutions.f1.values().iterator().next(); - IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0); + IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0); Map> accumulators = new HashMap<>(); accumulators.put("acc", new IntCounter(4)); AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators); @@ -352,7 +352,7 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception { // verify behavior for failed executions Execution execution2 = graphAndExecutions.f1.values().iterator().next(); - IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0); + IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0); Map> accumulators2 = new HashMap<>(); accumulators2.put("acc", new IntCounter(8)); AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2); @@ -380,7 +380,7 @@ public void testAccumulatorsAndMetricsStorage() throws Exception { Map executions = setupExecution(v1, 1, v2, 1).f1; - IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0); + IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0); Map> accumulators = Collections.emptyMap(); Execution execution1 = executions.values().iterator().next(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index 08bcaa0f1a251..e5b4fcedc2066 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -81,12 +81,7 @@ public void testHandleRequest() throws Exception { bytesInRemote, bytesOut, recordsIn, - recordsOut, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0); + recordsOut); final long[] timestamps = new long[ExecutionState.values().length]; timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index ca14af4548f86..fbc825e86fb26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -84,12 +84,7 @@ public void testHandleRequest() throws Exception { bytesInRemote, bytesOut, recordsIn, - recordsOut, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0); + recordsOut); final ArchivedExecutionJobVertex archivedExecutionJobVertex = new ArchivedExecutionJobVertex( new ArchivedExecutionVertex[]{ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java deleted file mode 100644 index caf0fed2ae7b4..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.legacy.utils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecution; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; -import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; - -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -/** - * Common entry-point for accessing generated ArchivedExecution* components. - */ -public class ArchivedJobGenerationUtils { - public static final ObjectMapper MAPPER = new ObjectMapper(); - public static final JsonFactory JACKSON_FACTORY = new JsonFactory() - .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) - .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); - - private static ArchivedExecutionGraph originalJob; - private static ArchivedExecutionJobVertex originalTask; - private static ArchivedExecutionVertex originalSubtask; - private static ArchivedExecution originalAttempt; - - private static final Object lock = new Object(); - - private ArchivedJobGenerationUtils() { - } - - public static AccessExecutionGraph getTestJob() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalJob; - } - - public static AccessExecutionJobVertex getTestTask() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalTask; - } - - public static AccessExecutionVertex getTestSubtask() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalSubtask; - } - - public static AccessExecution getTestAttempt() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalAttempt; - } - - private static void generateArchivedJob() throws Exception { - // Attempt - StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1"); - StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2"); - TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234); - AllocationID allocationID = new AllocationID(42L, 43L); - originalAttempt = new ArchivedExecutionBuilder() - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) - .setParallelSubtaskIndex(1) - .setAttemptNumber(0) - .setAssignedResourceLocation(location) - .setAssignedAllocationID(allocationID) - .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) - .setState(ExecutionState.FINISHED) - .setFailureCause("attemptException") - .build(); - // Subtask - originalSubtask = new ArchivedExecutionVertexBuilder() - .setSubtaskIndex(originalAttempt.getParallelSubtaskIndex()) - .setTaskNameWithSubtask("hello(1/1)") - .setCurrentExecution(originalAttempt) - .build(); - // Task - originalTask = new ArchivedExecutionJobVertexBuilder() - .setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask}) - .build(); - // Job - Map tasks = new HashMap<>(); - tasks.put(originalTask.getJobVertexId(), originalTask); - originalJob = new ArchivedExecutionGraphBuilder() - .setJobID(new JobID()) - .setTasks(tasks) - .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) - .setState(JobStatus.FINISHED) - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) - .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) - .build(); - } - - // ======================================================================== - // utility methods - // ======================================================================== - - public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) { - assertEquals(expectedAccs.length, writtenAccs.size()); - for (int x = 0; x < expectedAccs.length; x++) { - JsonNode acc = writtenAccs.get(x); - - assertEquals(expectedAccs[x].getName(), acc.get("name").asText()); - assertEquals(expectedAccs[x].getType(), acc.get("type").asText()); - assertEquals(expectedAccs[x].getValue(), acc.get("value").asText()); - } - } - - public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) { - assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong()); - assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong()); - assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong()); - assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong()); - } -}