Skip to content

Commit

Permalink
[FLINK-11343][tests] Force order that TaskExecutor shutdown after Tas…
Browse files Browse the repository at this point in the history
…k exit in TaskExecutorTest
  • Loading branch information
tisonkun authored and dawidwys committed Feb 26, 2019
1 parent d015ce7 commit 28cccde
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
Expand Down Expand Up @@ -715,11 +717,20 @@ public void testTaskSubmission() throws Exception {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.getFencingToken()).thenReturn(jobMasterId);

final OneShotLatch taskInTerminalState = new OneShotLatch();
final TaskManagerActions taskManagerActions = new NoOpTaskManagerActions() {
@Override
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
if (taskExecutionState.getExecutionState().isTerminal()) {
taskInTerminalState.trigger();
}
}
};
final JobManagerConnection jobManagerConnection = new JobManagerConnection(
jobId,
ResourceID.generate(),
jobMasterGateway,
mock(TaskManagerActions.class),
taskManagerActions,
mock(CheckpointResponder.class),
new TestGlobalAggregateManager(),
libraryCacheManager,
Expand Down Expand Up @@ -769,6 +780,8 @@ public void testTaskSubmission() throws Exception {
CompletableFuture<Boolean> completionFuture = TestInvokable.COMPLETABLE_FUTURE;

completionFuture.get();

taskInTerminalState.await();
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
Expand Down

0 comments on commit 28cccde

Please sign in to comment.