Skip to content

Commit

Permalink
[FLINK-11846] Don't delete HA job files in case of duplicate job subm…
Browse files Browse the repository at this point in the history
…ission

This commit changes the cleanup logic of the Dispatcher to only clean up job HA files
if the job is not a duplicate (meaning that it is either running or has already been
executed by the same JobMaster).

This closes apache#7918.
  • Loading branch information
tillrohrmann committed Mar 7, 2019
1 parent 25169cf commit f2b0e49
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,40 +260,57 @@ private void stopDispatcherServices() throws Exception {

@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
return internalSubmitJob(jobGraph).whenCompleteAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
} else {
return internalSubmitJob(jobGraph);
}
}, getRpcService().getExecutor());
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
final JobID jobId = jobGraph.getJobID();

log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
/**
* Checks whether the given job has already been submitted or executed.
*
* @param jobId identifying the submitted job
* @return true if the job has already been submitted (is running) or has been executed
* @throws FlinkException if the job scheduling status cannot be retrieved
*/
private boolean isDuplicateJob(JobID jobId) throws FlinkException {
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;

try {
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e);
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());

return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId);
}

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());

return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);

final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
} else {
return acknowledge;
}
}, getRpcService().getExecutor());
}

private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ public void testBlobServerCleanupWhenJobFinished() throws Exception {
submitJob();

// complete the job
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
terminationFuture.complete(null);
finishJob();

assertThatHABlobsHaveBeenRemoved();
}
Expand Down Expand Up @@ -358,6 +357,43 @@ public void testJobSubmissionUnderSameJobId() throws Exception {
assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
}

/**
* Tests that a duplicate job submission won't delete any job meta data
* (submitted job graphs, blobs, etc.).
*/
@Test
public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exception {
submitJob();

final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);

try {
try {
submissionFuture.get();
fail("Expected a JobSubmissionFailure.");
} catch (ExecutionException ee) {
assertThat(ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), is(true));
}

assertThatHABlobsHaveNotBeenRemoved();
} finally {
finishJob();
}

assertThatHABlobsHaveBeenRemoved();
}

private void finishJob() {
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
terminationFuture.complete(null);
}

private void assertThatHABlobsHaveNotBeenRemoved() {
assertThat(cleanupJobFuture.isDone(), is(false));
assertThat(deleteAllHABlobsFuture.isDone(), is(false));
assertThat(blobFile.exists(), is(true));
}

/**
* Tests that recovered jobs will only be started after the complete termination of any
* other previously running JobMasters for the same job.
Expand Down

0 comments on commit f2b0e49

Please sign in to comment.