Skip to content

Commit

Permalink
[FLINK-10314] Making JobManagerRunner creation non-blocking in Dispat…
Browse files Browse the repository at this point in the history
…cher

The JobManagerRunner creation can be a blocking operation, e.g. if the CheckpointCoordinator
needs to access a FileSystem. Therefore, this operation should not be executed in the main thread
of the Dispatcher in order to not block this component.

This closes apache#6699.
  • Loading branch information
tillrohrmann committed Sep 14, 2018
1 parent 6872bbb commit d092fd7
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ private FunctionUtils() {
throw new UnsupportedOperationException("This class should never be instantiated.");
}

private static final Function<Object, Void> NULL_FN = ignored -> null;

/**
* Function which returns {@code null} (type: Void).
*
* @param <T> input type
* @return Function which returns {@code null}.
*/
@SuppressWarnings("unchecked")
public static <T> Function<T, Void> nullFn() {
return (Function<T, Void>) NULL_FN;
}

/**
* Convert at {@link FunctionWithException} into a {@link Function}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -110,7 +111,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme

private final FatalErrorHandler fatalErrorHandler;

private final Map<JobID, JobManagerRunner> jobManagerRunners;
private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;

private final LeaderElectionService leaderElectionService;

Expand Down Expand Up @@ -166,7 +167,7 @@ public Dispatcher(

this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

jobManagerRunners = new HashMap<>(16);
jobManagerRunnerFutures = new HashMap<>(16);

leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();

Expand Down Expand Up @@ -248,7 +249,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
Expand All @@ -257,58 +258,72 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", throwable));
new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
}

private void persistAndRunJob(JobGraph jobGraph) throws Exception {
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));

try {
runJob(jobGraph);
} catch (Exception e) {
try {
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
} catch (Exception ie) {
e.addSuppressed(ie);
}

throw e;
}
}));
}

private void runJob(JobGraph jobGraph) throws Exception {
Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

jobManagerRunner.start();
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
return jobManagerRunnerFuture
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}

private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Exception {
final JobID jobId = jobGraph.getJobID();

final JobManagerRunner jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner(
ResourceID.generate(),
jobGraph,
configuration,
getRpcService(),
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler);
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();

final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
ResourceID.generate(),
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());

return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}

private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunners.get(jobId)) {
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
Expand All @@ -325,13 +340,15 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except
}
}, getMainThreadExecutor());

jobManagerRunner.start();

return jobManagerRunner;
}

@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
}

@Override
Expand Down Expand Up @@ -481,9 +498,9 @@ public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time ti

@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);

if (jobManagerRunner == null) {
if (jobManagerRunnerFuture == null) {
final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);

if (archivedExecutionGraph == null) {
Expand All @@ -492,7 +509,7 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout)
return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
}
} else {
return jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
}
}

Expand Down Expand Up @@ -566,11 +583,11 @@ private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
}

private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);

final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
if (jobManagerRunner != null) {
jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
if (jobManagerRunnerFuture != null) {
jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
} else {
jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -616,7 +633,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());

final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());

for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
Expand Down Expand Up @@ -739,16 +756,16 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
}

private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);

if (jobManagerRunner == null) {
if (jobManagerRunnerFuture == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
return leaderGatewayFuture.thenApplyAsync(
(JobMasterGateway jobMasterGateway) -> {
// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
if (jobManagerRunners.containsKey(jobId)) {
if (jobManagerRunnerFutures.containsKey(jobId)) {
return jobMasterGateway;
} else {
throw new CompletionException(new FlinkJobNotFoundException(jobId));
Expand All @@ -764,12 +781,12 @@ private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCo

@Nonnull
private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
final int numberJobsRunning = jobManagerRunners.size();
final int numberJobsRunning = jobManagerRunnerFutures.size();

ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
numberJobsRunning);

for (JobID jobId : jobManagerRunners.keySet()) {
for (JobID jobId : jobManagerRunnerFutures.keySet()) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
Expand Down Expand Up @@ -836,10 +853,10 @@ private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderS
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);

Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size());

for (JobGraph recoveredJob : recoveredJobs) {
final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
runFutures.add(runFuture);
}

Expand All @@ -850,24 +867,24 @@ private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderS
}
}

private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); });

return jobManagerTerminationFuture.thenRunAsync(
ThrowingRunnable.unchecked(() -> {
return jobManagerTerminationFuture.thenComposeAsync(
FunctionUtils.uncheckedFunction((ignored) -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
return action.apply(jobGraph);
}),
getMainThreadExecutor());
}

CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
if (jobManagerRunners.containsKey(jobId)) {
if (jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
Expand Down Expand Up @@ -923,7 +940,7 @@ public void handleError(final Exception exception) {
public void onAddedJobGraph(final JobID jobId) {
runAsync(
() -> {
if (!jobManagerRunners.containsKey(jobId)) {
if (!jobManagerRunnerFutures.containsKey(jobId)) {
// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
// the specified job is already removed from the SubmittedJobGraphStore. In this case,
// SubmittedJobGraphStore.recoverJob returns null.
Expand Down Expand Up @@ -962,7 +979,7 @@ public void onAddedJobGraph(final JobID jobId) {
private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
final JobID jobId = jobGraph.getJobID();
if (jobManagerRunners.containsKey(jobId)) {
if (jobManagerRunnerFutures.containsKey(jobId)) {
// we must not release the job graph lock since it can only be locked once and
// is currently being executed. Once we support multiple locks, we must release
// the JobGraph here
Expand Down
Loading

0 comments on commit d092fd7

Please sign in to comment.