Skip to content

Commit

Permalink
[FLINK-15299][test] Move ClientUtils#submitJob & ClientUtils#submitJo…
Browse files Browse the repository at this point in the history
…bAndWaitForResult to test scope

This closes apache#11469 .
  • Loading branch information
tisonkun committed Aug 19, 2020
1 parent b6592fc commit dfb8a3b
Show file tree
Hide file tree
Showing 28 changed files with 238 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,22 @@

package org.apache.flink.client;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -82,49 +73,6 @@ public static URLClassLoader buildUserCodeClassLoader(
checkClassloaderLeak);
}

public static JobExecutionResult submitJob(
ClusterClient<?> client,
JobGraph jobGraph) throws ProgramInvocationException {
checkNotNull(client);
checkNotNull(jobGraph);
try {
return client
.submitJob(jobGraph)
.thenApply(DetachedJobExecutionResult::new)
.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
}
}

public static JobExecutionResult submitJobAndWaitForResult(
ClusterClient<?> client,
JobGraph jobGraph,
ClassLoader classLoader) throws ProgramInvocationException {
checkNotNull(client);
checkNotNull(jobGraph);
checkNotNull(classLoader);

JobResult jobResult;

try {
jobResult = client
.submitJob(jobGraph)
.thenCompose(client::requestJobResult)
.get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobExecutionException | IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}

public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -227,8 +226,7 @@ public void shouldSubmitToJobClient() throws Exception {
jobGraph.addJars(Collections.emptyList());
jobGraph.setClasspaths(Collections.emptyList());

JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph);
assertNotNull(result);
assertNotNull(clusterClient.submitJob(jobGraph).get());
}

/**
Expand Down Expand Up @@ -430,7 +428,7 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
jobGraph.addJars(accessor.getJars());
jobGraph.setClasspaths(accessor.getClasspaths());

final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
final JobID jobID = clusterClient.submitJob(jobGraph).get();
return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
};
}
Expand Down
Loading

0 comments on commit dfb8a3b

Please sign in to comment.