Skip to content

Commit

Permalink
[FLINK-8900] [yarn] Set correct application status when job is finished
Browse files Browse the repository at this point in the history
This closes apache#5944
  • Loading branch information
StephanEwen committed May 3, 2018
1 parent 61f6504 commit 545d530
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -180,6 +181,9 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
});
}

@Override
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}

public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, MesosJobClusterEntrypoint.class.getSimpleName(), args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.clusterframework;

import org.apache.flink.runtime.jobgraph.JobStatus;

/**
* The status of an application.
*/
Expand Down Expand Up @@ -52,4 +54,27 @@ public int processExitCode() {
return processExitCode;
}

/**
* Derives the ApplicationStatus that should be used for a job that resulted in the given
* job status. If the job is not yet in a globally terminal state, this method returns
* {@link #UNKNOWN}.
*/
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
if (jobStatus == null) {
return UNKNOWN;
}
else {
switch (jobStatus) {
case FAILED:
return FAILED;
case CANCELED:
return CANCELED;
case FINISHED:
return SUCCEEDED;

default:
return UNKNOWN;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
Expand All @@ -40,6 +41,8 @@

import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Mini Dispatcher which is instantiated as the dispatcher component by the {@link JobClusterEntrypoint}.
*
Expand All @@ -52,6 +55,8 @@ public class MiniDispatcher extends Dispatcher {

private final JobClusterEntrypoint.ExecutionMode executionMode;

private final CompletableFuture<ApplicationStatus> jobTerminationFuture;

public MiniDispatcher(
RpcService rpcService,
String endpointId,
Expand Down Expand Up @@ -84,7 +89,12 @@ public MiniDispatcher(
fatalErrorHandler,
restAddress);

this.executionMode = executionMode;
this.executionMode = checkNotNull(executionMode);
this.jobTerminationFuture = new CompletableFuture<>();
}

public CompletableFuture<ApplicationStatus> getJobTerminationFuture() {
return jobTerminationFuture;
}

@Override
Expand All @@ -109,7 +119,12 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout)

if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the first JobResult successfully
jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;

jobTerminationFuture.complete(status);
});
}

return jobResultFuture;
Expand All @@ -121,7 +136,7 @@ protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedEx

if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
// shut down since we don't have to wait for the execution result retrieval
shutDown();
jobTerminationFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
}
}

Expand All @@ -130,6 +145,6 @@ protected void jobNotFinished(JobID jobId) {
super.jobNotFinished(jobId);

// shut down since we have done our job
shutDown();
jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ protected void runCluster(Configuration configuration) throws Exception {
LOG.info("Could not properly terminate the Dispatcher.", throwable);
}

// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAndTerminate(
SUCCESS_RETURN_CODE,
ApplicationStatus.SUCCEEDED,
Expand Down Expand Up @@ -578,7 +580,7 @@ private CompletableFuture<Void> shutDownAsync(
return terminationFuture;
}

private void shutDownAndTerminate(
protected void shutDownAndTerminate(
int returnCode,
ApplicationStatus applicationStatus,
@Nullable String diagnostics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
Expand All @@ -44,6 +45,7 @@

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
Expand Down Expand Up @@ -106,7 +108,7 @@ protected Dispatcher createDispatcher(

final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);

return new MiniDispatcher(
final MiniDispatcher dispatcher = new MiniDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME,
configuration,
Expand All @@ -122,7 +124,13 @@ protected Dispatcher createDispatcher(
restAddress,
jobGraph,
executionMode);

registerShutdownActions(dispatcher.getJobTerminationFuture());

return dispatcher;
}

protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;

protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -178,8 +179,8 @@ public void testSingleJobRecovery() throws Exception {
}

/**
* Tests that in detached mode, the {@link MiniDispatcher} will terminate after the job
* has completed.
* Tests that in detached mode, the {@link MiniDispatcher} will complete the future that
* signals job termination.
*/
@Test
public void testTerminationAfterJobCompletion() throws Exception {
Expand All @@ -197,7 +198,7 @@ public void testTerminationAfterJobCompletion() throws Exception {
resultFuture.complete(archivedExecutionGraph);

// wait until we terminate
miniDispatcher.getTerminationFuture().get();
miniDispatcher.getJobTerminationFuture().get();
} finally {
RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
}
Expand All @@ -222,9 +223,7 @@ public void testJobResultRetrieval() throws Exception {

resultFuture.complete(archivedExecutionGraph);

final CompletableFuture<Void> terminationFuture = miniDispatcher.getTerminationFuture();

assertThat(terminationFuture.isDone(), is(false));
assertFalse(miniDispatcher.getTerminationFuture().isDone());

final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class);

Expand All @@ -233,9 +232,8 @@ public void testJobResultRetrieval() throws Exception {
final JobResult jobResult = jobResultFuture.get();

assertThat(jobResult.getJobId(), is(jobGraph.getJobID()));

terminationFuture.get();
} finally {
}
finally {
RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.yarn.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Entry point for Yarn per-job clusters.
Expand Down Expand Up @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
}
}

@Override
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
terminationFuture.thenAccept((status) ->
shutDownAndTerminate(status.processExitCode(), status, null, true));
}

// ------------------------------------------------------------------------
// The executable entry point for the Yarn Application Master Process
// for a single Flink job.
// ------------------------------------------------------------------------

public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
Expand Down

0 comments on commit 545d530

Please sign in to comment.