Skip to content

Commit

Permalink
[hotfix] Refactor ExecutionGraphRestartTest to reuse cancellation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Oct 11, 2018
1 parent bf18ae7 commit 8ebedc6
Showing 1 changed file with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ public void testNoManualRestart() throws Exception {

eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));

for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
completeCanceling(eg);

assertEquals(JobStatus.FAILED, eg.getState());

Expand All @@ -128,6 +126,16 @@ public void testNoManualRestart() throws Exception {
assertEquals(JobStatus.FAILED, eg.getState());
}

private void completeCanceling(ExecutionGraph eg) {
executeOperationForAllExecutions(eg, Execution::cancelingComplete);
}

private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution> operation) {
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
operation.accept(vertex.getCurrentExecutionAttempt());
}
}

@Test
public void testRestartAutomatically() throws Exception {
RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 0L);
Expand Down Expand Up @@ -257,9 +265,7 @@ public void testCancelWhileFailing() throws Exception {
assertEquals(JobStatus.CANCELLING, graph.getState());

// let all tasks finish cancelling
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
completeCanceling(graph);

assertEquals(JobStatus.CANCELED, graph.getState());
}
Expand All @@ -270,11 +276,7 @@ public void testFailWhileCanceling() throws Exception {
final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;

assertEquals(JobStatus.RUNNING, graph.getState());

// switch all tasks to running
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().switchToRunning();
}
switchAllTasksToRunning(graph);

graph.cancel();

Expand All @@ -285,13 +287,15 @@ public void testFailWhileCanceling() throws Exception {
assertEquals(JobStatus.FAILING, graph.getState());

// let all tasks finish cancelling
for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
completeCanceling(graph);

assertEquals(JobStatus.FAILED, graph.getState());
}

private void switchAllTasksToRunning(ExecutionGraph graph) {
executeOperationForAllExecutions(graph, Execution::switchToRunning);
}

@Test
public void testNoRestartOnSuppressException() throws Exception {
final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
Expand All @@ -302,9 +306,7 @@ public void testNoRestartOnSuppressException() throws Exception {

assertEquals(JobStatus.FAILING, eg.getState());

for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
vertex.getCurrentExecutionAttempt().cancelingComplete();
}
completeCanceling(eg);

eg.waitUntilTerminal();
assertEquals(JobStatus.FAILED, eg.getState());
Expand Down

0 comments on commit 8ebedc6

Please sign in to comment.