Skip to content

Commit

Permalink
[FLINK-8573] Extend ProgramInvocationException message with Job ID wh…
Browse files Browse the repository at this point in the history
…ere applicable

This closes apache#6169.
This closes apache#5421.
  • Loading branch information
azagrebin authored and tillrohrmann committed Jun 15, 2018
1 parent 009e9fe commit fc3ee68
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
"JobManager.", fe);
"JobManager.", jobGraph.getJobID(), fe);
}

try {
Expand All @@ -497,7 +497,7 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws

return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e);
}
}

Expand All @@ -516,7 +516,8 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.",
jobGraph.getJobID(), e);
}

try {
Expand All @@ -529,7 +530,8 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade
classLoader);
return new JobSubmissionResult(jobGraph.getJobID());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
jobGraph.getJobID(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);

throw new ProgramInvocationException(
String.format("Could not run job %s in detached mode.", jobGraph.getJobID()),
e);
throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
Expand All @@ -91,17 +89,15 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);

throw new ProgramInvocationException(
String.format("Could not run job %s.", jobGraph.getJobID()),
e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}

try {
return jobResult.toJobExecutionResult(classLoader);
} catch (JobResult.WrappedJobException e) {
throw new ProgramInvocationException(e.getCause());
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException(e);
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public JobWithJars getPlanWithoutJars() throws ProgramInvocationException {
return new JobWithJars(getPlan(), Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
} else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
" for a program that is using the interactive mode.", getPlan().getJobId());
}
}

Expand All @@ -309,7 +309,7 @@ public JobWithJars getPlanWithJars() throws ProgramInvocationException {
return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader);
} else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
" for a program that is using the interactive mode.", getPlan().getJobId());
}
}

Expand Down Expand Up @@ -341,12 +341,12 @@ else if (isUsingInteractiveMode()) {
}
catch (Throwable t) {
// the invocation gets aborted with the preview plan
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
} else if (env.preview != null) {
return env.preview;
} else {
throw new ProgramInvocationException("The program caused an error: ", t);
if (env.previewPlan == null) {
if (env.preview != null) {
return env.preview;
} else {
throw new ProgramInvocationException("The program caused an error: ", getPlan().getJobId(), t);
}
}
}
finally {
Expand All @@ -357,7 +357,8 @@ else if (isUsingInteractiveMode()) {
previewPlan = env.previewPlan;
} else {
throw new ProgramInvocationException(
"The program plan could not be fetched. The program silently swallowed the control flow exceptions.");
"The program plan could not be fetched. The program silently swallowed the control flow exceptions.",
getPlan().getJobId());
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static JobGraph createJobGraph(
try {
jobGraph.addJar(new Path(url.toURI()));
} catch (URISyntaxException e) {
throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', e);
throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.client.program;

import org.apache.flink.api.common.JobID;

/**
* Exception used to indicate that there is an error during the invocation of a Flink program.
*/
Expand All @@ -37,6 +39,18 @@ public ProgramInvocationException(String message) {
super(message);
}

/**
* Creates a <tt>ProgramInvocationException</tt> with the given message which contains job id.
*
* @param message
* The additional message.
* @param jobID
* ID of failed job.
*/
public ProgramInvocationException(String message, JobID jobID) {
super(message + " (JobID: " + jobID + ")");
}

/**
* Creates a <tt>ProgramInvocationException</tt> for the given exception.
*
Expand All @@ -59,4 +73,19 @@ public ProgramInvocationException(Throwable cause) {
public ProgramInvocationException(String message, Throwable cause) {
super(message, cause);
}

/**
* Creates a <tt>ProgramInvocationException</tt> for the given exception with an
* additional message which contains job id.
*
* @param message
* The additional message.
* @param jobID
* ID of failed job.
* @param cause
* The exception that causes the program invocation to fail.
*/
public ProgramInvocationException(String message, JobID jobID, Throwable cause) {
super(message + " (JobID: " + jobID + ")", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
try {
return jobSubmissionFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() + '.', ExceptionUtils.stripExecutionException(e));
throw new ProgramInvocationException("Could not submit job",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
Expand All @@ -255,16 +256,17 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
try {
jobResult = jobResultFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e));
throw new ProgramInvocationException("Could not retrieve the execution result.",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}

try {
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobResult.WrappedJobException we) {
throw new ProgramInvocationException(we.getCause());
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), we.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException(e);
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(JarActionHandl
graph.addJar(new Path(jar.toURI()));
}
catch (URISyntaxException e) {
throw new ProgramInvocationException("Invalid jar path. Unexpected error. :(");
throw new ProgramInvocationException("Invalid jar path. Unexpected error. :(", graph.getJobID());
}
}
return Tuple2.of(graph, classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
try {
jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
throw new ProgramInvocationException("Could not write the user code classes to disk.", e);
throw new ProgramInvocationException("Could not write the user code classes to disk.",
streamGraph.getJobGraph().getJobID(), e);
}

List<URL> allJarFiles = new ArrayList<>(jarFiles.size() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
}
}
catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
streamGraph.getJobGraph().getJobID(), e);
}

client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
Expand All @@ -223,7 +224,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
}
catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
throw new ProgramInvocationException("The program execution failed" + term,
streamGraph.getJobGraph().getJobID(), e);
}
finally {
try {
Expand Down

0 comments on commit fc3ee68

Please sign in to comment.