Skip to content

Commit

Permalink
[FLINK-19124][datastream] Remove ClassLoader parameter from JobClient…
Browse files Browse the repository at this point in the history
… methods
  • Loading branch information
SteNicholas authored and aljoscha committed Sep 25, 2020
1 parent 749f70d commit b692300
Show file tree
Hide file tree
Showing 39 changed files with 93 additions and 112 deletions.
2 changes: 1 addition & 1 deletion docs/dev/datastream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ just submitted. For instance, here is how to implement the semantics of
{% highlight java %}
final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
{% endhighlight %}

That last part about program execution is crucial to understanding when and how
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/datastream_api.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ just submitted. For instance, here is how to implement the semantics of
{% highlight java %}
final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
{% endhighlight %}

That last part about program execution is crucial to understanding when and how
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, Coor

private final JobID jobID;

public ClusterClientJobClientAdapter(final ClusterClientProvider<ClusterID> clusterClientProvider, final JobID jobID) {
private final ClassLoader classLoader;

public ClusterClientJobClientAdapter(final ClusterClientProvider<ClusterID> clusterClientProvider, final JobID jobID, final ClassLoader classLoader) {
this.jobID = checkNotNull(jobID);
this.clusterClientProvider = checkNotNull(clusterClientProvider);
this.classLoader = classLoader;
}

@Override
Expand Down Expand Up @@ -92,7 +95,7 @@ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDire
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
public CompletableFuture<Map<String, Object>> getAccumulators() {
checkNotNull(classLoader);

return bridgeClientRequest(
Expand All @@ -102,16 +105,16 @@ public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classL
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
checkNotNull(classLoader);

return bridgeClientRequest(
clusterClientProvider,
(clusterClient -> clusterClient
.requestJobResult(jobID)
.thenApply((jobResult) -> {
try {
return jobResult.toJobExecutionResult(userClassloader);
return jobResult.toJobExecutionResult(classLoader);
} catch (Throwable t) {
throw new CompletionException(
new ProgramInvocationException("Job failed", jobID, t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,19 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway

private final Time timeout;

private final ClassLoader classLoader;

public EmbeddedJobClient(
final JobID jobId,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor,
final Time rpcTimeout) {
final Time rpcTimeout,
final ClassLoader classLoader) {
this.jobId = checkNotNull(jobId);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.retryExecutor = checkNotNull(retryExecutor);
this.timeout = checkNotNull(rpcTimeout);
this.classLoader = classLoader;
}

@Override
Expand Down Expand Up @@ -98,7 +102,7 @@ public CompletableFuture<String> triggerSavepoint(@Nullable final String savepoi
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(final ClassLoader classLoader) {
public CompletableFuture<Map<String, Object>> getAccumulators() {
checkNotNull(classLoader);

return dispatcherGateway.requestJob(jobId, timeout)
Expand All @@ -113,14 +117,14 @@ public CompletableFuture<Map<String, Object>> getAccumulators(final ClassLoader
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
checkNotNull(classLoader);

final Time retryPeriod = Time.milliseconds(100L);
return JobStatusPollingUtils.getJobResult(dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
.thenApply((jobResult) -> {
try {
return jobResult.toJobExecutionResult(userClassloader);
return jobResult.toJobExecutionResult(classLoader);
} catch (Throwable t) {
throw new CompletionException(new Exception("Job " + jobId + " failed", t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDire
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
public CompletableFuture<Map<String, Object>> getAccumulators() {
throw new FlinkRuntimeException("The Accumulators cannot be fetched through the Job Client when in Web Submission.");
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
throw new FlinkRuntimeException("The Job Result cannot be fetched through the Job Client when in Web Submission.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Confi
.map(JobID::fromHexString);

if (optJobId.isPresent() && submittedJobIds.contains(optJobId.get())) {
return getJobClientFuture(optJobId.get());
return getJobClientFuture(optJobId.get(), userCodeClassloader);
}

return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
}

private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId) {
private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId, final ClassLoader userCodeClassloader) {
LOG.info("Job {} was recovered successfully.", jobId);
return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId, userCodeClassloader));
}

private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException {
Expand Down Expand Up @@ -130,7 +130,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline
userCodeClassloader);
return jobId;
}))
.thenApplyAsync(jobID -> jobClientCreator.getJobClient(actualJobId));
.thenApplyAsync(jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader));
}

private static CompletableFuture<JobID> submitJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
return new EmbeddedExecutor(
submittedJobIds,
dispatcherGateway,
jobId -> {
(jobId, userCodeClassloader) -> {
final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout);
return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface EmbeddedJobClientCreator {
/**
* Creates a {@link JobClient} that is adequate for the context in which the job is executed.
* @param jobId the job id of the job associated with the returned client.
* @param userCodeClassloader the class loader to deserialize user code.
* @return the job client.
*/
JobClient getJobClient(final JobID jobId);
JobClient getJobClient(final JobID jobId, final ClassLoader userCodeClassloader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
return new EmbeddedExecutor(
submittedJobIds,
dispatcherGateway,
WebSubmissionJobClient::new);
(jobId, userCodeClassloader) -> new WebSubmissionJobClient(jobId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @N
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @N
}))
.thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID))
jobID,
userCodeClassloader))
.whenComplete((ignored1, ignored2) -> clusterClient.close());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult(getUserCodeClassLoader());
jobClient.getJobExecutionResult();

if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader use
userCodeClassloader);
return submissionResult;
}))
.thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
.thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster, userCodeClassloader))
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
// We failed to create the JobClient and must shutdown to ensure cleanup.
Expand Down Expand Up @@ -147,14 +147,16 @@ private static final class PerJobMiniClusterJobClient implements JobClient, Coor
private final JobID jobID;
private final MiniCluster miniCluster;
private final CompletableFuture<JobResult> jobResultFuture;
private final ClassLoader classLoader;

private PerJobMiniClusterJobClient(JobID jobID, MiniCluster miniCluster) {
private PerJobMiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader) {
this.jobID = jobID;
this.miniCluster = miniCluster;
this.jobResultFuture = miniCluster
.requestJobResult(jobID)
// Make sure to shutdown the cluster when the job completes.
.whenComplete((result, throwable) -> shutDownCluster(miniCluster));
this.classLoader = classLoader;
}

@Override
Expand Down Expand Up @@ -183,15 +185,15 @@ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDire
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
return getJobExecutionResult(classLoader).thenApply(JobExecutionResult::getAllAccumulatorResults);
public CompletableFuture<Map<String, Object>> getAccumulators() {
return getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults);
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader classLoader) {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
return jobResultFuture.thenApply(result -> {
try {
return result.toJobExecutionResult(classLoader);
return result.toJobExecutionResult(this.classLoader);
} catch (Exception e) {
throw new CompletionException("Failed to convert JobResult to JobExecutionResult.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult(getUserClassloader());
jobClient.getJobExecutionResult();

if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public static void main(String[] args) throws Exception {
env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
JobClient jc = env.executeAsync();

jc.getJobExecutionResult(TestMultiExecute.class.getClassLoader());
jc.getJobExecutionResult();
}
}
}
Expand Down Expand Up @@ -429,7 +429,7 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
jobGraph.setClasspaths(accessor.getClasspaths());

final JobID jobID = clusterClient.submitJob(jobGraph).get();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID, classLoader));
};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void testJobExecution() throws Exception {

JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();

JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
assertThat(jobExecutionResult, is(notNullValue()));

Map<String, Object> actual = jobClient.getAccumulators(getClass().getClassLoader()).get();
Map<String, Object> actual = jobClient.getAccumulators().get();
assertThat(actual, is(notNullValue()));

assertThatMiniClusterIsShutdown();
Expand All @@ -87,7 +87,7 @@ public void testJobClient() throws Exception {
assertThrows(
"Job was cancelled.",
ExecutionException.class,
() -> jobClient.getJobExecutionResult(getClass().getClassLoader()).get()
() -> jobClient.getJobExecutionResult().get()
);

assertThatMiniClusterIsShutdown();
Expand Down Expand Up @@ -127,12 +127,12 @@ public void testMultipleExecutions() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
{
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsShutdown();
}
{
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsShutdown();
}
}
Expand All @@ -141,7 +141,7 @@ public void testMultipleExecutions() throws Exception {
public void testJobClientInteractionAfterShutdown() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsShutdown();

assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@ public interface JobClient {
* Requests the accumulators of the associated job. Accumulators can be requested while it is running
* or after it has finished. The class loader is used to deserialize the incoming accumulator results.
*/
CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader);
CompletableFuture<Map<String, Object>> getAccumulators();

/**
* Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
*
* @param userClassloader the classloader used to de-serialize the accumulators of the job.
*/
CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader);
CompletableFuture<JobExecutionResult> getJobExecutionResult();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void main(String[] args) throws Exception {

TableResult result = tEnv.executeSql(sqlStatement);
// wait job finish
result.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
result.getJobClient().get().getJobExecutionResult().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static void main(String[] args) throws Exception {
TableResult tableResult = resultTable.executeInsert(sinkTableName);
// wait job finish
tableResult.getJobClient().get()
.getJobExecutionResult(Thread.currentThread().getContextClassLoader())
.getJobExecutionResult()
.get();
System.out.println("[INFO]Run TPC-DS query " + queryId + " success.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ public JobExecutionResult execute(String jobName) throws Exception {

try {
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
lastJobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public CompletableFuture<JobStatus> getJobStatus() {
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
}

Expand All @@ -65,7 +65,7 @@ public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDire
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
public CompletableFuture<Map<String, Object>> getAccumulators() {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
}
Loading

0 comments on commit b692300

Please sign in to comment.