Skip to content

Commit

Permalink
[FLINK-15116] Make JobClient stateless, remove AutoCloseable
Browse files Browse the repository at this point in the history
With this change, the JobClient acquires the required ClusterClient for
each method call. This means that the users no longer have the burden of
managing the JobClient lifecycle, i.e. they can freely ignore the result
of executeAsync().
  • Loading branch information
aljoscha committed Dec 8, 2019
1 parent 6825f80 commit 1b99d53
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -63,31 +64,28 @@ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @N

final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);

final ClusterClient<ClusterID> clusterClient = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())
.getClusterClient();
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

final boolean withShutdownHook = !configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit();

if (withShutdownHook) {
Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
clusterClient::shutDownCluster, clusterClient.getClass().getSimpleName(), LOG);

return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) {
@Override
protected void doClose() {
ShutdownHookUtil.removeShutdownHook(shutdownHook, clusterClient.getClass().getSimpleName(), LOG);
clusterClient.close();
}
ShutdownHookUtil.addShutdownHook(
() -> {
try (ClusterClient<ClusterID> client = clusterClientProvider.getClusterClient()) {
client.shutDownCluster();
}
},
"Cluster shutdown hook for attached Job execution",
LOG);

return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<ClusterID>(clusterClientProvider, jobGraph.getJobID()) {
});
} else {
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) {
@Override
protected void doClose() {
clusterClient.close();
}
});
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -56,18 +57,14 @@ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @N
final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);

final ClusterClient<ClusterID> clusterClient = clusterDescriptor
.retrieve(clusterID)
.getClusterClient();

final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
return clusterClient
.submitJob(jobGraph)
.thenApply(jobID -> new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) {
@Override
protected void doClose() {
clusterClient.close();
}
});
.thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID))
.whenComplete((ignored1, ignored2) -> clusterClient.close());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;

import org.apache.commons.io.IOUtils;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -44,15 +43,13 @@
*/
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {

private final ClusterClient<ClusterID> clusterClient;
private final ClusterClientProvider<ClusterID> clusterClientProvider;

private final JobID jobID;

private final AtomicBoolean running = new AtomicBoolean(true);

public ClusterClientJobClientAdapter(final ClusterClient<ClusterID> clusterClient, final JobID jobID) {
public ClusterClientJobClientAdapter(final ClusterClientProvider<ClusterID> clusterClientProvider, final JobID jobID) {
this.jobID = checkNotNull(jobID);
this.clusterClient = checkNotNull(clusterClient);
this.clusterClientProvider = checkNotNull(clusterClientProvider);
}

@Override
Expand All @@ -62,62 +59,78 @@ public JobID getJobID() {

@Override
public CompletableFuture<JobStatus> getJobStatus() {
return clusterClient.getJobStatus(jobID);
return bridgeClientRequest(
clusterClientProvider,
(clusterClient -> clusterClient.getJobStatus(jobID)));
}

@Override
public CompletableFuture<Void> cancel() {
return clusterClient.cancel(jobID).thenApply(FunctionUtils.nullFn());
return bridgeClientRequest(
clusterClientProvider,
(clusterClient -> clusterClient.cancel(jobID).thenApply((ignored) -> null)));
}

@Override
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
return clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory);
return bridgeClientRequest(
clusterClientProvider,
(clusterClient ->
clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory)));
}

@Override
public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) {
return clusterClient.triggerSavepoint(jobID, savepointDirectory);
return bridgeClientRequest(
clusterClientProvider,
(clusterClient ->
clusterClient.triggerSavepoint(jobID, savepointDirectory)));
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
return clusterClient.getAccumulators(jobID, classLoader);
checkNotNull(classLoader);

return bridgeClientRequest(
clusterClientProvider,
(clusterClient ->
clusterClient.getAccumulators(jobID, classLoader)));
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);

final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
return jobResultFuture.handle((jobResult, throwable) -> {
if (throwable != null) {
ExceptionUtils.checkInterrupted(throwable);
throw new CompletionException(new ProgramInvocationException("Could not run job", jobID, throwable));
} else {
try {
return jobResult.toJobExecutionResult(userClassloader);
} catch (JobExecutionException | IOException | ClassNotFoundException e) {
throw new CompletionException(new ProgramInvocationException("Job failed", jobID, e));
}
}
});
return bridgeClientRequest(
clusterClientProvider,
(clusterClient -> clusterClient
.requestJobResult(jobID)
.thenApply((jobResult) -> {
try {
return jobResult.toJobExecutionResult(userClassloader);
} catch (Throwable t) {
throw new CompletionException(
new ProgramInvocationException("Job failed", jobID, t));
}
})));
}

@Override
public final void close() {
if (running.compareAndSet(true, false)) {
doClose();
}
}
private static <T> CompletableFuture<T> bridgeClientRequest(
ClusterClientProvider<?> clusterClientProvider,
Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {

ClusterClient<?> clusterClient = clusterClientProvider.getClusterClient();

/**
* Method to be overridden by subclass which contains actual close actions.
*
* <p>We do close in this way to ensure multiple calls to {@link #close()}
* are executed at most once guarded by {@link #running} flag.
*/
protected void doClose() {
CompletableFuture<T> resultFuture;
try {
resultFuture = resultRetriever.apply(clusterClient);
} catch (Throwable throwable) {
IOUtils.closeQuietly(clusterClient::close);
return FutureUtils.completedExceptionally(throwable);
}

return resultFuture.whenCompleteAsync(
(jobResult, throwable) -> IOUtils.closeQuietly(clusterClient::close));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.client.deployment.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
Expand Down Expand Up @@ -62,15 +63,14 @@ public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration con
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

return clusterClient
.submitJob(jobGraph)
.thenApply(jobID -> new ClusterClientJobClientAdapter<MiniClusterClient.MiniClusterId>(clusterClient, jobID) {
@Override
protected void doClose() {
clusterClient.close();
shutdownMiniCluster(miniCluster);
}
});
CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

jobIdFuture
.thenCompose(clusterClient::requestJobResult)
.thenAccept((jobResult) -> clusterClient.shutDownCluster());

return jobIdFuture.thenApply(jobID ->
new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
}

private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public Executor getExecutor(@Nonnull Configuration configuration) {
jobGraph.setClasspaths(accessor.getClasspaths());

final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(clusterClient, jobID));
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* A client that is scoped to a specific job.
*/
@PublicEvolving
public interface JobClient extends AutoCloseable {
public interface JobClient {

/**
* Returns the {@link JobID} that uniquely identifies the job this client is scoped to.
Expand Down Expand Up @@ -83,7 +83,4 @@ public interface JobClient extends AutoCloseable {
* @param userClassloader the classloader used to de-serialize the accumulators of the job.
*/
CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader);

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,13 +804,15 @@ public JobExecutionResult execute() throws Exception {
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute(String jobName) throws Exception {
try (final JobClient jobClient = executeAsync(jobName).get()) {
lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());
final JobClient jobClient = executeAsync(jobName).get();

return lastJobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
} else {
lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}

return lastJobExecutionResult;
}

/**
Expand All @@ -822,10 +824,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
*
* <p>The program execution will be logged and displayed with a generated default name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception Thrown, if the program submission fails.
*/
Expand All @@ -843,10 +841,6 @@ public final CompletableFuture<JobClient> executeAsync() throws Exception {
*
* <p>The program execution will be logged and displayed with the given job name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception Thrown, if the program submission fails.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,4 @@ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDire
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1619,10 +1619,12 @@ public JobExecutionResult execute(String jobName) throws Exception {
*/
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
try (final JobClient jobClient = executeAsync(streamGraph).get()) {
return configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());
final JobClient jobClient = executeAsync(streamGraph).get();

if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
return jobClient.getJobExecutionResult(userClassloader).get();
} else {
return new DetachedJobExecutionResult(jobClient.getJobID());
}
}

Expand All @@ -1634,10 +1636,6 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
* <p>The program execution will be logged and displayed with a generated
* default name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
Expand All @@ -1653,10 +1651,6 @@ public final CompletableFuture<JobClient> executeAsync() throws Exception {
*
* <p>The program execution will be logged and displayed with the provided name
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @param jobName desired name of the job
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
Expand Down
Loading

0 comments on commit 1b99d53

Please sign in to comment.