Skip to content

Commit

Permalink
[FLINK-8775] [flip6] Non blocking MiniCluster shut down
Browse files Browse the repository at this point in the history
This closes apache#5576.
  • Loading branch information
tillrohrmann committed Feb 24, 2018
1 parent c9787b6 commit 4e7f03e
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,22 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

if (strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunners.get(jobId)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable));
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

if (strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
} else {
onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable));
}
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}
}, getMainThreadExecutor());

Expand Down Expand Up @@ -294,6 +300,9 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
if (jobManagerRunners.isEmpty()) {
System.out.println("empty");
}
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
}
Expand Down
Loading

0 comments on commit 4e7f03e

Please sign in to comment.