Skip to content

Commit

Permalink
[hotfix][metrics] Remove legacy/unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijiangW authored and zentol committed Apr 30, 2019
1 parent 0632600 commit f1b2e9b
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

Expand All @@ -53,8 +54,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY;

/**
* Tests for the HistoryServer.
*/
Expand All @@ -63,6 +62,10 @@ public class HistoryServerTest extends TestLogger {
@ClassRule
public static final TemporaryFolder TMP = new TemporaryFolder();

private static final JsonFactory JACKSON_FACTORY = new JsonFactory()
.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);

private MiniClusterWithClientResource cluster;
private File jmDirectory;
private File hsDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,51 +32,29 @@ public class IOMetrics implements Serializable {
protected long numRecordsIn;
protected long numRecordsOut;

protected double numRecordsInPerSecond;
protected double numRecordsOutPerSecond;

protected long numBytesInLocal;
protected long numBytesInRemote;
protected long numBytesOut;

protected double numBytesInLocalPerSecond;
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;

public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) {
this.numRecordsIn = recordsIn.getCount();
this.numRecordsInPerSecond = recordsIn.getRate();
this.numRecordsOut = recordsOut.getCount();
this.numRecordsOutPerSecond = recordsOut.getRate();
this.numBytesInLocal = bytesLocalIn.getCount();
this.numBytesInLocalPerSecond = bytesLocalIn.getRate();
this.numBytesInRemote = bytesRemoteIn.getCount();
this.numBytesInRemotePerSecond = bytesRemoteIn.getRate();
this.numBytesOut = bytesOut.getCount();
this.numBytesOutPerSecond = bytesOut.getRate();
}

public IOMetrics(
long numBytesInLocal,
long numBytesInRemote,
long numBytesOut,
long numRecordsIn,
long numRecordsOut,
double numBytesInLocalPerSecond,
double numBytesInRemotePerSecond,
double numBytesOutPerSecond,
double numRecordsInPerSecond,
double numRecordsOutPerSecond) {
long numRecordsOut) {
this.numBytesInLocal = numBytesInLocal;
this.numBytesInRemote = numBytesInRemote;
this.numBytesOut = numBytesOut;
this.numRecordsIn = numRecordsIn;
this.numRecordsOut = numRecordsOut;
this.numBytesInLocalPerSecond = numBytesInLocalPerSecond;
this.numBytesInRemotePerSecond = numBytesInRemotePerSecond;
this.numBytesOutPerSecond = numBytesOutPerSecond;
this.numRecordsInPerSecond = numRecordsInPerSecond;
this.numRecordsOutPerSecond = numRecordsOutPerSecond;
}

public long getNumRecordsIn() {
Expand All @@ -95,31 +73,7 @@ public long getNumBytesInRemote() {
return numBytesInRemote;
}

public long getNumBytesInTotal() {
return numBytesInLocal + numBytesInRemote;
}

public long getNumBytesOut() {
return numBytesOut;
}

public double getNumRecordsInPerSecond() {
return numRecordsInPerSecond;
}

public double getNumRecordsOutPerSecond() {
return numRecordsOutPerSecond;
}

public double getNumBytesInLocalPerSecond() {
return numBytesInLocalPerSecond;
}

public double getNumBytesInRemotePerSecond() {
return numBytesInRemotePerSecond;
}

public double getNumBytesOutPerSecond() {
return numBytesOutPerSecond;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,6 @@ public Counter getNumBuffersInRemoteCounter() {
return numBuffersInRemote;
}

public Meter getNumBytesInLocalRateMeter() {
return numBytesInRateLocal;
}

public Meter getNumBytesInRemoteRateMeter() {
return numBytesInRateRemote;
}

public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}

// ============================================================================================
// Buffer metrics
// ============================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;

import javax.annotation.Nullable;

import java.io.IOException;

/**
* This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics.
*
Expand All @@ -51,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics {
private boolean numRecordsOutComplete = true;

public MutableIOMetrics() {
super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
super(0, 0, 0, 0, 0);
}

public boolean isNumBytesInLocalComplete() {
Expand Down Expand Up @@ -151,44 +147,4 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche
}
}
}

/**
* Writes the IO metrics contained in this object to the given {@link JsonGenerator}.
*
* <p>The JSON structure written is as follows:
* "metrics": {
* "read-bytes": 1,
* "read-bytes-complete": true,
* "write-bytes": 2,
* "write-bytes-complete": true,
* "read-records": 3,
* "read-records-complete": true,
* "write-records": 4,
* "write-records-complete": true
* }
*
* @param gen JsonGenerator to which the metrics should be written
* @throws IOException
*/
public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
/**
* As described in {@link addIOMetrics}, we want to distinguish incomplete values from 0.
* However, for API backward compatibility, incomplete metrics will still be represented by the 0 value and
* a boolean will indicate the completeness.
*/

gen.writeObjectFieldStart("metrics");

Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
gen.writeNumberField("read-bytes", numBytesIn);
gen.writeBooleanField("read-bytes-complete", (this.numBytesInLocalComplete && this.numBytesInRemoteComplete));
gen.writeNumberField("write-bytes", this.numBytesOut);
gen.writeBooleanField("write-bytes-complete", this.numBytesOutComplete);
gen.writeNumberField("read-records", this.numRecordsIn);
gen.writeBooleanField("read-records-complete", this.numRecordsInComplete);
gen.writeNumberField("write-records", this.numRecordsOut);
gen.writeBooleanField("write-records-complete", this.numRecordsOutComplete);

gen.writeEndObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception {
// verify behavior for canceled executions
Execution execution1 = graphAndExecutions.f1.values().iterator().next();

IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
accumulators.put("acc", new IntCounter(4));
AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
Expand All @@ -352,7 +352,7 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception {
// verify behavior for failed executions
Execution execution2 = graphAndExecutions.f1.values().iterator().next();

IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
accumulators2.put("acc", new IntCounter(8));
AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
Expand Down Expand Up @@ -380,7 +380,7 @@ public void testAccumulatorsAndMetricsStorage() throws Exception {

Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 1, v2, 1).f1;

IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();

Execution execution1 = executions.values().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ public void testHandleRequest() throws Exception {
bytesInRemote,
bytesOut,
recordsIn,
recordsOut,
0.0,
0.0,
0.0,
0.0,
0.0);
recordsOut);

final long[] timestamps = new long[ExecutionState.values().length];
timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ public void testHandleRequest() throws Exception {
bytesInRemote,
bytesOut,
recordsIn,
recordsOut,
0.0,
0.0,
0.0,
0.0,
0.0);
recordsOut);

final ArchivedExecutionJobVertex archivedExecutionJobVertex = new ArchivedExecutionJobVertex(
new ArchivedExecutionVertex[]{
Expand Down
Loading

0 comments on commit f1b2e9b

Please sign in to comment.