Skip to content

Commit

Permalink
[FLINK-22197][tests] Shutdown cluster before collecting checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Apr 14, 2021
1 parent baf575c commit 0461e4b
Showing 1 changed file with 4 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.fail;

/** Base class for tests related to unaligned checkpoints. */
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
Expand Down Expand Up @@ -170,19 +169,14 @@ protected File execute(UnalignedSettings settings) throws Exception {
if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) {
throw e;
}
if (settings.generateCheckpoint) {
return Files.find(checkpointDir.toPath(), 2, this::isCompletedCheckpoint)
.max(Comparator.comparing(Path::toString))
.map(Path::toFile)
.orElseThrow(
() -> new IllegalStateException("Cannot generate checkpoint", e));
}
throw e;
} finally {
miniCluster.after();
}
if (settings.generateCheckpoint) {
fail("Could not generate checkpoint");
return Files.find(checkpointDir.toPath(), 2, this::isCompletedCheckpoint)
.max(Comparator.comparing(Path::toString))
.map(Path::toFile)
.orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
}
return null;
}
Expand Down

0 comments on commit 0461e4b

Please sign in to comment.