Skip to content

Commit

Permalink
[hotfix][test] Clean up the test code in SourceStreamTaskTest and One…
Browse files Browse the repository at this point in the history
…InputStreamTaskTest

These cleanups include removing unnecessary type parameter declarations and redundant suppression, etc.
  • Loading branch information
sunhaibotb authored and zhijiangW committed Oct 28, 2019
1 parent d4be257 commit 8543334
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ public void testOpenCloseAndTimestamps() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
StreamMap<String, String> mapOperator = new StreamMap<>(new TestOpenCloseMapFunction());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());

long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.invoke();
testHarness.waitForTaskRunning();

testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1));
testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2));
expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
testHarness.processElement(new StreamRecord<>("Hello", initialTime + 1));
testHarness.processElement(new StreamRecord<>("Ciao", initialTime + 2));
expectedOutput.add(new StreamRecord<>("Hello", initialTime + 1));
expectedOutput.add(new StreamRecord<>("Ciao", initialTime + 2));

testHarness.waitForInputProcessing();

Expand All @@ -143,7 +143,6 @@ public void testOpenCloseAndTimestamps() throws Exception {
* forwarded watermark must be the minimum of the watermarks of all active inputs.
*/
@Test
@SuppressWarnings("unchecked")
public void testWatermarkAndStreamStatusForwarding() throws Exception {

final OneInputStreamTaskTestHarness<String, String> testHarness =
Expand All @@ -155,11 +154,11 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -183,10 +182,10 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
testHarness.getOutput());

// contrary to checkpoint barriers these elements are not blocked by watermarks
testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
testHarness.processElement(new StreamRecord<String>("Ciao", initialTime));
expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao", initialTime));
testHarness.processElement(new StreamRecord<>("Hello", initialTime));
testHarness.processElement(new StreamRecord<>("Ciao", initialTime));
expectedOutput.add(new StreamRecord<>("Hello", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao", initialTime));

testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
Expand Down Expand Up @@ -277,7 +276,7 @@ public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {

// --------------------- begin test ---------------------

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.invoke();
testHarness.waitForTaskRunning();
Expand Down Expand Up @@ -367,11 +366,11 @@ public void testCheckpointBarriers() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -381,16 +380,16 @@ public void testCheckpointBarriers() throws Exception {

// These elements should be buffered until we receive barriers from
// all inputs
testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0);

// These elements should be forwarded, since we did not yet receive a checkpoint barrier
// on that input, only add to same input, otherwise we would not know the ordering
// of the output since the Task might read the inputs in any order
testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
testHarness.processElement(new StreamRecord<>("Hello-1-1", initialTime), 1, 1);
testHarness.processElement(new StreamRecord<>("Ciao-1-1", initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("Hello-1-1", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));

testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements from non-blocked input
Expand All @@ -404,8 +403,8 @@ public void testCheckpointBarriers() throws Exception {

// now we should see the barrier and after that the buffered elements
expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));

testHarness.endInput();

Expand All @@ -430,11 +429,11 @@ public void testOvertakingCheckpointBarriers() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
long initialTime = 0L;

testHarness.invoke();
Expand All @@ -444,16 +443,16 @@ public void testOvertakingCheckpointBarriers() throws Exception {

// These elements should be buffered until we receive barriers from
// all inputs
testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0);

// These elements should be forwarded, since we did not yet receive a checkpoint barrier
// on that input, only add to same input, otherwise we would not know the ordering
// of the output since the Task might read the inputs in any order
testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
testHarness.processElement(new StreamRecord<>("Hello-1-1", initialTime), 1, 1);
testHarness.processElement(new StreamRecord<>("Ciao-1-1", initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("Hello-1-1", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime));

testHarness.waitForInputProcessing();
// we should not yet see the barrier, only the two elements from non-blocked input
Expand All @@ -467,8 +466,8 @@ public void testOvertakingCheckpointBarriers() throws Exception {
testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);

expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()));

testHarness.waitForInputProcessing();
Expand Down Expand Up @@ -708,6 +707,7 @@ public void processElement(StreamRecord<String> element) {
}

@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

Expand Down Expand Up @@ -797,7 +797,7 @@ public void processElement(StreamRecord<String> element) throws Exception {
}

@Override
public void processWatermark(Watermark mark) throws Exception {
public void processWatermark(Watermark mark) {
output.emitWatermark(new Watermark(mark.getTimestamp() * 2));
}
}
Expand Down Expand Up @@ -848,7 +848,7 @@ private void configureChainedTestingStreamOperator(
null
),
0,
Collections.<String>emptyList(),
Collections.emptyList(),
null,
null
);
Expand Down Expand Up @@ -921,9 +921,10 @@ public void processElement(StreamRecord<IN> element) throws Exception {
}
}


// This must only be used in one test, otherwise the static fields will be changed
// by several tests concurrently
/**
* This must only be used in one test, otherwise the static fields will be changed
* by several tests concurrently.
*/
private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class SourceStreamTaskTest {
* This test verifies that open() and close() are correctly called by the StreamTask.
*/
@Test
@SuppressWarnings("unchecked")
public void testOpenClose() throws Exception {
final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
Expand Down Expand Up @@ -288,7 +287,6 @@ public MockSource(int maxElements, int checkpointDelay, int readDelay) {

@Override
public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
final Object lockObject = ctx.getCheckpointLock();
while (isRunning && count < maxElements) {
// simulate some work
try {
Expand All @@ -299,8 +297,8 @@ public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
Thread.currentThread().interrupt();
}

synchronized (lockObject) {
ctx.collect(new Tuple2<Long, Integer>(lastCheckpointId, count));
synchronized (ctx.getCheckpointLock()) {
ctx.collect(new Tuple2<>(lastCheckpointId, count));
count++;
}
}
Expand Down Expand Up @@ -330,7 +328,7 @@ public List<Serializable> snapshotState(long checkpointId, long timestamp) throw
Assert.fail("Count is different at start end end of snapshot.");
}
semaphore.release();
return Collections.<Serializable>singletonList(sum);
return Collections.singletonList(sum);
}

@Override
Expand Down

0 comments on commit 8543334

Please sign in to comment.