Skip to content

Commit

Permalink
[FLINK-8224] [flip6] Shutdown application when job terminated in job …
Browse files Browse the repository at this point in the history
…mode

This closes apache#5139.
  • Loading branch information
tiemsn authored and tillrohrmann committed Jan 25, 2018
1 parent c1734f4 commit a4ecc7f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -371,7 +373,7 @@ public void postStop() throws Exception {
@Override
protected void shutDownApplication(
ApplicationStatus finalStatus,
String optionalDiagnostics) throws ResourceManagerException {
@Nullable String optionalDiagnostics) throws ResourceManagerException {
LOG.info("Shutting down and unregistering as a Mesos framework.");

Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;

import akka.actor.ActorSystem;

Expand Down Expand Up @@ -258,8 +260,15 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
}
}

private void shutDownAndTerminate(boolean cleanupHaData) {
private void shutDownAndTerminate(
boolean cleanupHaData,
ApplicationStatus status,
@Nullable String optionalDiagnostics) {
try {
if (resourceManager != null) {
resourceManager.shutDownCluster(status, optionalDiagnostics);
}

shutDown(cleanupHaData);
} catch (Throwable t) {
LOG.error("Could not properly shut down cluster entrypoint.", t);
Expand Down Expand Up @@ -292,23 +301,27 @@ private TerminatingOnCompleteActions(JobID jobId) {
public void jobFinished(JobResult result) {
LOG.info("Job({}) finished.", jobId);

shutDownAndTerminate(true);
shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null);
}

@Override
public void jobFailed(JobResult result) {
checkArgument(result.getSerializedThrowable().isPresent());

LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage());
final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get();

final String errorMessage = serializedThrowable.getMessage();

LOG.info("Job({}) failed: {}.", jobId, errorMessage);

shutDownAndTerminate(false);
shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage);
}

@Override
public void jobFinishedByOther() {
LOG.info("Job({}) was finished by another JobManager.", jobId);

shutDownAndTerminate(false);
shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -479,10 +481,12 @@ public void unRegisterInfoMessageListener(final String address) {
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param optionalDiagnostics for the Flink application
* @param optionalDiagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
public void shutDownCluster(
final ApplicationStatus finalStatus,
@Nullable final String optionalDiagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);

try {
Expand Down Expand Up @@ -930,10 +934,12 @@ public void handleError(final Exception exception) {
* yet are returned.
*
* @param finalStatus The application status to report.
* @param optionalDiagnostics An optional diagnostics message.
* @param optionalDiagnostics A diagnostics message or {@code null}.
* @throws ResourceManagerException if the application could not be shut down.
*/
protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException;
protected abstract void shutDownApplication(
ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) throws ResourceManagerException;

/**
* Allocates a resource using the resource profile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

import javax.annotation.Nullable;

/**
* A standalone implementation of the resource manager. Used when the system is started in
* standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
*
* This ResourceManager doesn't acquire new resources.
* <p>This ResourceManager doesn't acquire new resources.
*/
public class StandaloneResourceManager extends ResourceManager<ResourceID> {

Expand Down Expand Up @@ -67,7 +69,7 @@ protected void initialize() throws ResourceManagerException {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

import javax.annotation.Nullable;

/**
* Simple {@link ResourceManager} implementation for testing purposes.
*/
Expand All @@ -54,7 +56,7 @@ protected void initialize() throws ResourceManagerException {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException {
protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,13 @@ public void postStop() throws Exception {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
protected void shutDownApplication(
ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) {

// first, de-register from YARN
FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
log.info("Unregister application from the YARN Resource Manager");
log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);

try {
resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
Expand Down

0 comments on commit a4ecc7f

Please sign in to comment.