Skip to content

Commit

Permalink
[FLINK-14496][client] Exclude detach flag from ClusterClient
Browse files Browse the repository at this point in the history
This closes apache#9972 .
  • Loading branch information
tisonkun committed Nov 1, 2019
1 parent 09f2f43 commit bf5235e
Show file tree
Hide file tree
Showing 37 changed files with 247 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,9 +42,12 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarFile;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Utility functions for Flink client.
*/
Expand Down Expand Up @@ -95,15 +102,61 @@ public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> cla
return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
}

public static JobExecutionResult submitJob(
ClusterClient<?> client,
JobGraph jobGraph) throws ProgramInvocationException {
checkNotNull(client);
checkNotNull(jobGraph);
try {
return client
.submitJob(jobGraph)
.thenApply(JobSubmissionResult::getJobID)
.thenApply(DetachedJobExecutionResult::new)
.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
}
}

public static JobExecutionResult submitJobAndWaitForResult(
ClusterClient<?> client,
JobGraph jobGraph,
ClassLoader classLoader) throws ProgramInvocationException {
checkNotNull(client);
checkNotNull(jobGraph);
checkNotNull(classLoader);

JobResult jobResult;

try {
jobResult = client
.submitJob(jobGraph)
.thenApply(JobSubmissionResult::getJobID)
.thenCompose(client::requestJobResult)
.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobExecutionException | IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}

public static JobSubmissionResult executeProgram(
ClusterClient<?> client,
PackagedProgram program,
int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
ClusterClient<?> client,
PackagedProgram program,
int parallelism,
boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

LOG.info("Starting program (detached: {})", client.isDetached());
LOG.info("Starting program (detached: {})", detached);

final List<URL> libraries = program.getAllLibraries();

Expand All @@ -115,7 +168,7 @@ public static JobSubmissionResult executeProgram(
program.getClasspaths(),
program.getUserCodeClassLoader(),
parallelism,
client.isDetached(),
detached,
program.getSavepointSettings(),
jobExecutionResult);
ContextEnvironment.setAsContext(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private JobExecutionResult executePlanWithJars(JobGraph jobGraph, ClassLoader cl
try (ClusterClient<?> client = new RestClusterClient<>(
clientConfiguration,
"RemoteExecutor")) {
return client.submitJob(jobGraph, classLoader).getJobExecutionResult();
return ClientUtils.submitJobAndWaitForResult(client, jobGraph, classLoader).getJobExecutionResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,15 @@ private <ClusterID> void runProgram(
}

try {
client.setDetached(executionParameters.getDetachedMode());

int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
userParallelism = defaultParallelism;
}

executeProgram(program, client, userParallelism);
executeProgram(program, client, userParallelism, executionParameters.getDetachedMode());
} finally {
if (clusterId == null && !client.isDetached()) {
if (clusterId == null && !executionParameters.getDetachedMode()) {
// terminate the cluster only if we have started it before and if it's not detached
try {
client.shutDownCluster();
Expand Down Expand Up @@ -745,10 +743,14 @@ private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPa
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------

protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
protected void executeProgram(
PackagedProgram program,
ClusterClient<?> client,
int parallelism,
boolean detached) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");

JobSubmissionResult result = ClientUtils.executeProgram(client, program, parallelism);
JobSubmissionResult result = ClientUtils.executeProgram(client, program, parallelism, detached);

if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
*/
public abstract class ClusterClient<T> implements AutoCloseable {

/** Switch for blocking/detached job submission of the client. */
private boolean detachedJobSubmission = false;

/**
* User overridable hook to close the client, possibly closes internal services.
* @deprecated use the {@link #close()} instead. This method stays for backwards compatibility.
Expand Down Expand Up @@ -153,36 +150,12 @@ public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws
*/
public abstract T getClusterId();

/**
* Set the mode of this client (detached or blocking job execution).
* @param isDetached If true, the client will submit programs detached via the {@code run} method
*/
public void setDetached(boolean isDetached) {
this.detachedJobSubmission = isDetached;
}

/**
* A flag to indicate whether this clients submits jobs detached.
* @return True if the Client submits detached, false otherwise
*/
public boolean isDetached() {
return detachedJobSubmission;
}

/**
* Return the Flink configuration object.
* @return The Flink configuration object
*/
public abstract Configuration getFlinkConfiguration();

/**
* Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
* some custom job submission logic.
* @param jobGraph The JobGraph to be submitted
* @return JobSubmissionResult
*/
public abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;

/**
* Submit the given {@link JobGraph} to the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach);
jobGraph.setClasspaths(this.classpathsToAttach);

lastJobExecutionResult = client
.submitJob(jobGraph, this.userCodeClassLoader)
.getJobExecutionResult();
if (detached) {
lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
} else {
lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
}

setJobExecutionResult(lastJobExecutionResult);

Expand Down Expand Up @@ -132,6 +134,10 @@ public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}

public boolean isDetached() {
return detached;
}

// --------------------------------------------------------------------------------------------

public static void setAsContext(ContextEnvironmentFactory factory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,23 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Client to interact with a {@link MiniCluster}.
Expand All @@ -61,40 +57,6 @@ public Configuration getFlinkConfiguration() {
return new Configuration(configuration);
}

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);

if (isDetached()) {
try {
final JobSubmissionResult jobSubmissionResult = jobSubmissionResultFuture.get();
return new DetachedJobExecutionResult(jobSubmissionResult.getJobID());
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);

throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));

final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);

throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobExecutionException | IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}
}

@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
return miniCluster.submitJob(jobGraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand Down Expand Up @@ -232,43 +229,6 @@ public void close() {
}
}

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
LOG.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());

final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);

if (isDetached()) {
try {
final JobSubmissionResult jobSubmissionResult = jobSubmissionFuture.get();

LOG.warn("Job was executed in detached mode, the results will be available on completion.");

return new DetachedJobExecutionResult(jobSubmissionResult.getJobID());
} catch (Exception e) {
throw new ProgramInvocationException("Could not submit job",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
ignored -> requestJobResult(jobGraph.getJobID()));

final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not retrieve the execution result.",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobExecutionException | IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
}
}
}

/**
* Requests the job details.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ private RunTestingCliFrontend(
}

@Override
protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
assertEquals(isDetached, client.isDetached());
protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism, boolean detached) {
assertEquals(isDetached, detached);
assertEquals(expectedParallelism, parallelism);
}
}
Expand Down
Loading

0 comments on commit bf5235e

Please sign in to comment.