Skip to content

Commit

Permalink
[hotfix][runtime] Clean up unnecessary type argument declarations in …
Browse files Browse the repository at this point in the history
…TwoInputStreamTaskTest and TwoInputStreamTaskTestHarness
  • Loading branch information
sunhaibotb authored and pnowojski committed Aug 30, 2019
1 parent 8ca6a09 commit 6779bd6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class TwoInputStreamTaskTest {
* timestamp to emitted elements.
*/
@Test
@SuppressWarnings("unchecked")
public void testOpenCloseAndTimestamps() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
Expand All @@ -91,20 +90,20 @@ public void testOpenCloseAndTimestamps() throws Exception {
streamConfig.setOperatorID(new OperatorID());

long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.invoke();
testHarness.waitForTaskRunning();

testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0);
expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
testHarness.processElement(new StreamRecord<>("Hello", initialTime + 1), 0, 0);
expectedOutput.add(new StreamRecord<>("Hello", initialTime + 1));

// wait until the input is processed to ensure ordering of the output
testHarness.waitForInputProcessing();

testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0);
testHarness.processElement(new StreamRecord<>(1337, initialTime + 2), 1, 0);

expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
expectedOutput.add(new StreamRecord<>("1337", initialTime + 2));

testHarness.waitForInputProcessing();

Expand All @@ -123,11 +122,10 @@ public void testOpenCloseAndTimestamps() throws Exception {
* forwarded watermark must be the minimum of the watermarks of all active inputs.
*/
@Test
@SuppressWarnings("unchecked")
public void testWatermarkAndStreamStatusForwarding() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<String, Integer, String>(
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO,
Expand All @@ -140,7 +138,7 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -163,10 +161,10 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

// contrary to checkpoint barriers these elements are not blocked by watermarks
testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
expectedOutput.add(new StreamRecord<String>("42", initialTime));
testHarness.processElement(new StreamRecord<>("Hello", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("Hello", initialTime));
expectedOutput.add(new StreamRecord<>("42", initialTime));

testHarness.waitForInputProcessing();
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
Expand Down Expand Up @@ -236,22 +234,21 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
* This test verifies that checkpoint barriers are correctly forwarded.
*/
@Test
@SuppressWarnings("unchecked")
public void testCheckpointBarriers() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<String, Integer, String>(
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -261,21 +258,21 @@ public void testCheckpointBarriers() throws Exception {

// This element should be buffered since we received a checkpoint barrier on
// this input
testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0);

// This one should go through
testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 1);
expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));

testHarness.waitForInputProcessing();

// These elements should be forwarded, since we did not yet receive a checkpoint barrier
// on that input, only add to same input, otherwise we would not know the ordering
// of the output since the Task might read the inputs in any order
testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1);
testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<String>("11", initialTime));
expectedOutput.add(new StreamRecord<String>("111", initialTime));
testHarness.processElement(new StreamRecord<>(11, initialTime), 1, 1);
testHarness.processElement(new StreamRecord<>(111, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("11", initialTime));
expectedOutput.add(new StreamRecord<>("111", initialTime));

testHarness.waitForInputProcessing();

Expand Down Expand Up @@ -304,7 +301,7 @@ public void testCheckpointBarriers() throws Exception {

// now we should see the barrier and after that the buffered elements
expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime));

TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
Expand All @@ -321,7 +318,6 @@ public void testCheckpointBarriers() throws Exception {
* then all inputs receive barriers from a later checkpoint.
*/
@Test
@SuppressWarnings("unchecked")
public void testOvertakingCheckpointBarriers() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
Expand All @@ -333,11 +329,11 @@ public void testOvertakingCheckpointBarriers() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -347,16 +343,16 @@ public void testOvertakingCheckpointBarriers() throws Exception {

// These elements should be buffered until we receive barriers from
// all inputs
testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0);

// These elements should be forwarded, since we did not yet receive a checkpoint barrier
// on that input, only add to same input, otherwise we would not know the ordering
// of the output since the Task might read the inputs in any order
testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<String>("42", initialTime));
expectedOutput.add(new StreamRecord<String>("1337", initialTime));
testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1);
testHarness.processElement(new StreamRecord<>(1337, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("42", initialTime));
expectedOutput.add(new StreamRecord<>("1337", initialTime));

testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements from non-blocked input
Expand All @@ -372,8 +368,8 @@ public void testOvertakingCheckpointBarriers() throws Exception {
testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);

expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()));

testHarness.waitForInputProcessing();
Expand Down Expand Up @@ -471,6 +467,7 @@ public InputSelection nextSelection() {
}

@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
Expand Down Expand Up @@ -592,7 +589,7 @@ public void testHandlingEndOfInput() throws Exception {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.invoke();
testHarness.waitForTaskRunning();
Expand Down Expand Up @@ -650,15 +647,15 @@ public void close() throws Exception {
}

@Override
public String map1(String value) throws Exception {
public String map1(String value) {
if (!openCalled) {
Assert.fail("Open was not called before run.");
}
return value;
}

@Override
public String map2(Integer value) throws Exception {
public String map2(Integer value) {
if (!openCalled) {
Assert.fail("Open was not called before run.");
}
Expand All @@ -670,12 +667,12 @@ private static class IdentityMap implements CoMapFunction<String, Integer, Strin
private static final long serialVersionUID = 1L;

@Override
public String map1(String value) throws Exception {
public String map1(String value) {
return value;
}

@Override
public String map2(Integer value) throws Exception {
public String map2(Integer value) {

return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -57,10 +56,8 @@
*/
public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {

private TypeInformation<IN1> inputType1;
private TypeSerializer<IN1> inputSerializer1;

private TypeInformation<IN2> inputType2;
private TypeSerializer<IN2> inputSerializer2;

private int[] inputGateAssignment;
Expand All @@ -81,10 +78,8 @@ public TwoInputStreamTaskTestHarness(

super(taskFactory, outputType);

this.inputType1 = inputType1;
inputSerializer1 = inputType1.createSerializer(executionConfig);

this.inputType2 = inputType2;
inputSerializer2 = inputType2.createSerializer(executionConfig);

this.numInputGates = numInputGates;
Expand All @@ -110,45 +105,45 @@ public TwoInputStreamTaskTestHarness(
protected void initializeInputs() throws IOException, InterruptedException {

inputGates = new StreamTestSingleInputGate[numInputGates];
List<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
List<StreamEdge> inPhysicalEdges = new LinkedList<>();

StreamOperator<IN1> dummyOperator = new AbstractStreamOperator<IN1>() {
private static final long serialVersionUID = 1L;
};

StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);

for (int i = 0; i < numInputGates; i++) {

switch (inputGateAssignment[i]) {
case 1: {
inputGates[i] = new StreamTestSingleInputGate<IN1>(
inputGates[i] = new StreamTestSingleInputGate<>(
numInputChannelsPerGate,
bufferSize,
inputSerializer1);

StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
targetVertexDummy,
1,
new LinkedList<String>(),
new BroadcastPartitioner<Object>(),
new LinkedList<>(),
new BroadcastPartitioner<>(),
null /* output tag */);

inPhysicalEdges.add(streamEdge);
break;
}
case 2: {
inputGates[i] = new StreamTestSingleInputGate<IN2>(
inputGates[i] = new StreamTestSingleInputGate<>(
numInputChannelsPerGate,
bufferSize,
inputSerializer2);

StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
targetVertexDummy,
2,
new LinkedList<String>(),
new BroadcastPartitioner<Object>(),
new LinkedList<>(),
new BroadcastPartitioner<>(),
null /* output tag */);

inPhysicalEdges.add(streamEdge);
Expand Down

0 comments on commit 6779bd6

Please sign in to comment.