Skip to content

Commit

Permalink
[FLINK-9530][metrics] Fix numRecords task metric for chains
Browse files Browse the repository at this point in the history
This closes apache#6126.
  • Loading branch information
zentol committed Jun 14, 2018
1 parent 7e51b90 commit 03d77b8
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,6 @@ public ChainingOutput(
Counter tmpNumRecordsIn;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
ioMetricGroup.reuseOutputMetricsForTask();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
Expand Down Expand Up @@ -611,6 +613,55 @@ public void close() throws Exception {
}
}

@Test
public void testOperatorMetricReuse() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
.chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();

final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
@Override
public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
}
};

final StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};

final Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();

testHarness.invoke(env);
testHarness.waitForTaskRunning();

final int numRecords = 5;

for (int x = 0; x < numRecords; x++) {
testHarness.processElement(new StreamRecord<>("hello"));
}
testHarness.waitForInputProcessing();

assertEquals(numRecords, numRecordsInCounter.getCount());
assertEquals(numRecords * 2 * 2 * 2, numRecordsOutCounter.getCount());
}

static class DuplicatingOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
@Override
public void processElement(StreamRecord<String> element) {
output.collect(element);
output.collect(element);
}
}

@Test
public void testWatermarkMetrics() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
Expand All @@ -36,6 +38,8 @@
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -48,6 +52,8 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.junit.Assert.assertEquals;

/**
* Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
* implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
Expand Down Expand Up @@ -382,6 +388,67 @@ public void testOvertakingCheckpointBarriers() throws Exception {
testHarness.getOutput());
}

@Test
public void testOperatorMetricReuse() throws Exception {
final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
.chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();

final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
@Override
public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
}
};

final StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};

final Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();

testHarness.invoke(env);
testHarness.waitForTaskRunning();

final int numRecords1 = 5;
final int numRecords2 = 3;

for (int x = 0; x < numRecords1; x++) {
testHarness.processElement(new StreamRecord<>("hello"), 0, 0);
}

for (int x = 0; x < numRecords2; x++) {
testHarness.processElement(new StreamRecord<>("hello"), 1, 0);
}
testHarness.waitForInputProcessing();

assertEquals(numRecords1 + numRecords2, numRecordsInCounter.getCount());
assertEquals((numRecords1 + numRecords2) * 2 * 2 * 2, numRecordsOutCounter.getCount());
}

static class DuplicatingOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element);
output.collect(element);
}

@Override
public void processElement2(StreamRecord<String> element) {
output.collect(element);
output.collect(element);
}
}

@Test
public void testWatermarkMetrics() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Expand Down

0 comments on commit 03d77b8

Please sign in to comment.