Skip to content

Commit

Permalink
[hotfix][test] Clean up the codes in StreamTaskTestHarness, StreamTas…
Browse files Browse the repository at this point in the history
…kTimerTest and TestProcessingTimeServiceTest

These cleanups include removing redundant initializer, unnecessary type parameter declarations and redundant
suppression, and so on.
  • Loading branch information
sunhaibotb authored and pnowojski committed Feb 21, 2020
1 parent 5fce538 commit fd1ee49
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ public void teardown() throws Exception {
@Test
public void testOpenCloseAndTimestamps() {
// first one spawns thread
timeService.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) {
}
});
timeService.registerTimer(System.currentTimeMillis(), timestamp -> {});

assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;

Expand All @@ -50,7 +49,7 @@ public void testCustomTimeServiceProvider() throws Throwable {

StreamConfig streamConfig = testHarness.getStreamConfig();

StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<>());
streamConfig.setStreamOperator(mapOperator);
streamConfig.setOperatorID(new OperatorID());

Expand All @@ -69,19 +68,9 @@ public void testCustomTimeServiceProvider() throws Throwable {
assertEquals(processingTimeService.getCurrentProcessingTime(), 16);

// register 2 tasks
processingTimeService.registerTimer(30, new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) {
processingTimeService.registerTimer(30, timestamp -> {});

}
});

processingTimeService.registerTimer(40, new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) {

}
});
processingTimeService.registerTimer(40, timestamp -> {});

assertEquals(2, tp.getNumActiveTimers());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
Expand Down Expand Up @@ -90,8 +89,8 @@ public class StreamTaskTestHarness<OUT> {

private final Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory;

public long memorySize = 0;
public int bufferSize = 0;
public long memorySize;
public int bufferSize;

protected StreamMockEnvironment mockEnv;
protected ExecutionConfig executionConfig;
Expand Down Expand Up @@ -148,7 +147,7 @@ public StreamTaskTestHarness(
streamConfig.setBufferTimeout(0);

outputSerializer = outputType.createSerializer(executionConfig);
outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
outputStreamRecordSerializer = new StreamElementSerializer<>(outputSerializer);

this.taskStateManager = new TestTaskStateManager(localRecoveryConfig);
}
Expand Down Expand Up @@ -181,9 +180,8 @@ public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot taskStateS
Collections.singletonMap(checkpointId, taskStateSnapshot));
}

@SuppressWarnings("unchecked")
private void initializeOutput() {
outputList = new LinkedBlockingQueue<Object>();
outputList = new LinkedBlockingQueue<>();
mockEnv.addOutput(outputList, outputStreamRecordSerializer);
}

Expand All @@ -200,7 +198,7 @@ public void setupOutputForSingletonOperatorChain() {
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setOutputSelectors(Collections.emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
Expand All @@ -210,11 +208,11 @@ public void setupOutputForSingletonOperatorChain() {
private static final long serialVersionUID = 1L;
};

List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);

outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));

streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
Expand Down Expand Up @@ -267,8 +265,6 @@ public Thread invoke(StreamMockEnvironment mockEnv) throws Exception {

/**
* Waits for the task completion.
*
* @throws Exception
*/
public void waitForTaskCompletion() throws Exception {
waitForTaskCompletion(Long.MAX_VALUE);
Expand All @@ -279,7 +275,6 @@ public void waitForTaskCompletion() throws Exception {
* TimeoutException is thrown.
*
* @param timeout Timeout for the task completion
* @throws Exception
*/
public void waitForTaskCompletion(long timeout) throws Exception {
Preconditions.checkState(taskThread != null, "Task thread was not started.");
Expand All @@ -292,8 +287,6 @@ public void waitForTaskCompletion(long timeout) throws Exception {

/**
* Waits for the task to be running.
*
* @throws Exception
*/
public void waitForTaskRunning() throws Exception {
Preconditions.checkState(taskThread != null, "Task thread was not started.");
Expand Down Expand Up @@ -335,7 +328,7 @@ private void shutdownIOManager() throws Exception {
this.mockEnv.getIOManager().close();
}

private void shutdownMemoryManager() throws Exception {
private void shutdownMemoryManager() {
if (this.memorySize > 0) {
MemoryManager memMan = this.mockEnv.getMemoryManager();
if (memMan != null) {
Expand All @@ -348,15 +341,13 @@ private void shutdownMemoryManager() throws Exception {
/**
* Sends the element to input gate 0 on channel 0.
*/
@SuppressWarnings("unchecked")
public void processElement(Object element) {
inputGates[0].sendElement(element, 0);
}

/**
* Sends the element to the specified channel on the specified input gate.
*/
@SuppressWarnings("unchecked")
public void processElement(Object element, int inputGate, int channel) {
inputGates[inputGate].sendElement(element, channel);
}
Expand Down

0 comments on commit fd1ee49

Please sign in to comment.