Skip to content

Commit

Permalink
[FLINK-2632] [client] Fix web client to respect the class loader of s…
Browse files Browse the repository at this point in the history
…ubmitted jobs

This closes apache#1114
  • Loading branch information
mjsax authored and fhueske committed Sep 10, 2015
1 parent 79a7bb2 commit 9c2eaa8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
Expand Down Expand Up @@ -137,7 +136,7 @@ public class CliFrontend {

private FlinkPlan optimizedPlan;

private JobGraph jobGraph;
private PackagedProgram packagedProgram;

/**
*
Expand Down Expand Up @@ -401,7 +400,7 @@ protected int info(String[] args) {

if (webFrontend) {
this.optimizedPlan = flinkPlan;
this.jobGraph = client.getJobGraph(program, flinkPlan);
this.packagedProgram = program;
} else {
String jsonPlan = new PlanJSONDumpGenerator()
.getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
Expand Down Expand Up @@ -980,8 +979,8 @@ public FlinkPlan getFlinkPlan() {
return this.optimizedPlan;
}

public JobGraph getJobGraph() {
return this.jobGraph;
public PackagedProgram getPackagedProgram() {
return this.packagedProgram;
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,13 +87,13 @@ public class JobSubmissionServlet extends HttpServlet {

// ------------------------------------------------------------------------

private final File jobStoreDirectory; // the directory containing the uploaded jobs
private final File jobStoreDirectory; // the directory containing the uploaded jobs

private final File planDumpDirectory; // the directory to dump the optimizer plans to
private final File planDumpDirectory; // the directory to dump the optimizer plans to

private final Map<Long, JobGraph> submittedJobs; // map from UIDs to the running jobs
private final Map<Long, Tuple2<PackagedProgram, FlinkPlan>> submittedJobs; // map from UIDs to the submitted jobs

private final Random rand; // random number generator for UID
private final Random rand; // random number generator for UID

private final CliFrontend cli;

Expand All @@ -103,7 +104,7 @@ public JobSubmissionServlet(CliFrontend cli, File jobDir, File planDir) {
this.jobStoreDirectory = jobDir;
this.planDumpDirectory = planDir;

this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, Tuple2<PackagedProgram, FlinkPlan>>());

this.rand = new Random(System.currentTimeMillis());
}
Expand Down Expand Up @@ -263,7 +264,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
}
}
else {
this.submittedJobs.put(uid, this.cli.getJobGraph());
this.submittedJobs.put(uid, new Tuple2<PackagedProgram, FlinkPlan>(this.cli.getPackagedProgram(), optPlan));
}

// redirect to the plan display page
Expand Down Expand Up @@ -304,7 +305,7 @@ else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
}

// get the retained job
JobGraph job = submittedJobs.remove(uid);
Tuple2<PackagedProgram, FlinkPlan> job = submittedJobs.remove(uid);
if (job == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"No job with the given uid was retained for later submission.");
Expand All @@ -313,8 +314,8 @@ else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {

// submit the job
try {
Client client = new Client(GlobalConfiguration.getConfiguration(), getClass().getClassLoader());
client.run(job, false);
Client client = new Client(GlobalConfiguration.getConfiguration(), job.f0.getUserCodeClassLoader());
client.run(client.getJobGraph(job.f0, job.f1), false);
}
catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex);
Expand Down

0 comments on commit 9c2eaa8

Please sign in to comment.