Skip to content

Commit

Permalink
[hotfix][config] Remove CheckpointConfig#enableUnalignedCheckpoints w…
Browse files Browse the repository at this point in the history
…ithout parameters.

CheckpointConfig#enableUnalignedCheckpoints(boolean) makes it explicit and also is more future-proof. When unaligned checkpoints become the default, this method will be mostly useless and we would need to add a #disableUnalignedCheckpoints() for consistency.
  • Loading branch information
Arvid Heise authored and pnowojski committed Jun 22, 2020
1 parent 6d52d04 commit 59979b2
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

setupEnvironment(env, pt);
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().enableUnalignedCheckpoints(true);

if (isOriginalJobVariant(pt)) {
executeOriginalVariant(env, pt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,21 +398,6 @@ public void enableUnalignedCheckpoints(boolean enabled) {
unalignedCheckpointsEnabled = enabled;
}

/**
* Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
*
* <p>Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows
* checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the
* current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
*
* <p>Unaligned checkpoints can only be enabled if {@link #checkpointingMode} is
* {@link CheckpointingMode#EXACTLY_ONCE}.
*/
@PublicEvolving
public void enableUnalignedCheckpoints() {
enableUnalignedCheckpoints(true);
}

/**
* Returns whether checkpoints should be persisted externally.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private LocalStreamEnvironment createEnv(int parallelism, int slotsPerTaskManage
env.enableCheckpointing(100);
// keep in sync with FailingMapper in #createDAG
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(100)));
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
return env;
}

Expand Down

0 comments on commit 59979b2

Please sign in to comment.