Skip to content

Commit

Permalink
[hotfix][coordination] Reduce lambda nesting for action on Completabl…
Browse files Browse the repository at this point in the history
…eFuture

Directly use 'whenCompleteAsync()' with an executor, rather than 'whenComplete()' and nest a call
to submit to the executor.
  • Loading branch information
StephanEwen committed Apr 16, 2021
1 parent 2a225d6 commit b1e1860
Showing 1 changed file with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,23 +276,21 @@ private void checkpointCoordinatorInternal(

final CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<>();

coordinatorCheckpoint.whenComplete(
(success, failure) ->
mainThreadExecutor.execute(
() -> {
if (failure != null) {
result.completeExceptionally(failure);
} else if (eventValve.tryShutValve(checkpointId)) {
result.complete(success);
} else {
// if we cannot shut the valve, this means the checkpoint
// has been aborted before, so the future is already
// completed exceptionally. but we try to complete it here
// again, just in case, as a safety net.
result.completeExceptionally(
new FlinkException("Cannot shut event valve"));
}
}));
coordinatorCheckpoint.whenCompleteAsync(
(success, failure) -> {
if (failure != null) {
result.completeExceptionally(failure);
} else if (eventValve.tryShutValve(checkpointId)) {
result.complete(success);
} else {
// if we cannot shut the valve, this means the checkpoint
// has been aborted before, so the future is already
// completed exceptionally. but we try to complete it here
// again, just in case, as a safety net.
result.completeExceptionally(new FlinkException("Cannot shut event valve"));
}
},
mainThreadExecutor);

try {
eventValve.markForCheckpoint(checkpointId);
Expand Down

0 comments on commit b1e1860

Please sign in to comment.