Skip to content

Commit

Permalink
[hotfix] Replace finally block with JUnit After method in SourceOpera…
Browse files Browse the repository at this point in the history
…torTest.
  • Loading branch information
becketqin committed Nov 5, 2020
1 parent 4279a03 commit d143762
Showing 1 changed file with 34 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.util.CollectionUtil;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -74,6 +75,11 @@ public void setup() {
this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX, true /* emit progressive watermarks */);
}

@After
public void cleanUp() throws Exception {
operator.close();
}

@Test
public void testInitializeState() throws Exception {
StateInitializationContext stateContext = getStateContext();
Expand All @@ -88,55 +94,39 @@ public void testOpen() throws Exception {
operator.initializeState(getStateContext());
// Open the operator.
operator.open();
try {
// The source reader should have been assigned a split.
assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits());
// The source reader should have started.
assertTrue(mockSourceReader.isStarted());

// A ReaderRegistrationRequest should have been sent.
assertEquals(1, mockGateway.getEventsSent().size());
OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
assertTrue(operatorEvent instanceof ReaderRegistrationEvent);
assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId());

}
finally {
operator.close();
}
// The source reader should have been assigned a split.
assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits());
// The source reader should have started.
assertTrue(mockSourceReader.isStarted());

// A ReaderRegistrationRequest should have been sent.
assertEquals(1, mockGateway.getEventsSent().size());
OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
assertTrue(operatorEvent instanceof ReaderRegistrationEvent);
assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId());
assertTrue(mockSourceReader.isClosed());
}

@Test
public void testHandleAddSplitsEvent() throws Exception {
operator.initializeState(getStateContext());
operator.open();
try {
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
// The source reader should have been assigned two splits.
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
}
finally {
operator.close();
}
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
// The source reader should have been assigned two splits.
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
}

@Test
public void testHandleAddSourceEvent() throws Exception {
operator.initializeState(getStateContext());
operator.open();
try {
SourceEvent event = new SourceEvent() {
};
operator.handleOperatorEvent(new SourceEventWrapper(event));
// The source reader should have been assigned two splits.
assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents());
}
finally {
operator.close();
}
SourceEvent event = new SourceEvent() {
};
operator.handleOperatorEvent(new SourceEventWrapper(event));
// The source reader should have been assigned two splits.
assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents());
}

@Test
Expand All @@ -155,19 +145,14 @@ public void testSnapshotState() throws Exception {
StateInitializationContext stateContext = getStateContext();
operator.initializeState(stateContext);
operator.open();
try {
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));

// Verify the splits in state.
List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get());
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
}
finally {
operator.close();
}
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));

// Verify the splits in state.
List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get());
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
}

// ---------------- helper methods -------------------------
Expand Down

0 comments on commit d143762

Please sign in to comment.