diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 8e4ce4823a702..e08baedeb4efe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -116,9 +116,7 @@ public void testNoManualRestart() throws Exception { eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); - for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { - vertex.getCurrentExecutionAttempt().cancelingComplete(); - } + completeCanceling(eg); assertEquals(JobStatus.FAILED, eg.getState()); @@ -128,6 +126,16 @@ public void testNoManualRestart() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); } + private void completeCanceling(ExecutionGraph eg) { + executeOperationForAllExecutions(eg, Execution::cancelingComplete); + } + + private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer operation) { + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + operation.accept(vertex.getCurrentExecutionAttempt()); + } + } + @Test public void testRestartAutomatically() throws Exception { RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 0L); @@ -257,9 +265,7 @@ public void testCancelWhileFailing() throws Exception { assertEquals(JobStatus.CANCELLING, graph.getState()); // let all tasks finish cancelling - for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { - vertex.getCurrentExecutionAttempt().cancelingComplete(); - } + completeCanceling(graph); assertEquals(JobStatus.CANCELED, graph.getState()); } @@ -270,11 +276,7 @@ public void testFailWhileCanceling() throws Exception { final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0; assertEquals(JobStatus.RUNNING, graph.getState()); - - // switch all tasks to running - for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { - vertex.getCurrentExecutionAttempt().switchToRunning(); - } + switchAllTasksToRunning(graph); graph.cancel(); @@ -285,13 +287,15 @@ public void testFailWhileCanceling() throws Exception { assertEquals(JobStatus.FAILING, graph.getState()); // let all tasks finish cancelling - for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { - vertex.getCurrentExecutionAttempt().cancelingComplete(); - } + completeCanceling(graph); assertEquals(JobStatus.FAILED, graph.getState()); } + private void switchAllTasksToRunning(ExecutionGraph graph) { + executeOperationForAllExecutions(graph, Execution::switchToRunning); + } + @Test public void testNoRestartOnSuppressException() throws Exception { final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0; @@ -302,9 +306,7 @@ public void testNoRestartOnSuppressException() throws Exception { assertEquals(JobStatus.FAILING, eg.getState()); - for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { - vertex.getCurrentExecutionAttempt().cancelingComplete(); - } + completeCanceling(eg); eg.waitUntilTerminal(); assertEquals(JobStatus.FAILED, eg.getState());