diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index ac779a7faa2f6..c082b10f1c8bf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -481,7 +481,7 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws actorSystem = actorSystemLoader.get(); } catch (FlinkException fe) { throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " + - "JobManager.", fe); + "JobManager.", jobGraph.getJobID(), fe); } try { @@ -497,7 +497,7 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws return lastJobExecutionResult; } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e); } } @@ -516,7 +516,8 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade try { jobManagerGateway = getJobManagerGateway(); } catch (Exception e) { - throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e); + throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", + jobGraph.getJobID(), e); } try { @@ -529,7 +530,8 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade classLoader); return new JobSubmissionResult(jobGraph.getJobID()); } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), + jobGraph.getJobID(), e); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index faf96ec296529..81cf784441dc6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -77,9 +77,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); - throw new ProgramInvocationException( - String.format("Could not run job %s in detached mode.", jobGraph.getJobID()), - e); + throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e); } } else { final CompletableFuture jobResultFuture = jobSubmissionResultFuture.thenCompose( @@ -91,17 +89,15 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) } catch (InterruptedException | ExecutionException e) { ExceptionUtils.checkInterrupted(e); - throw new ProgramInvocationException( - String.format("Could not run job %s.", jobGraph.getJobID()), - e); + throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e); } try { return jobResult.toJobExecutionResult(classLoader); } catch (JobResult.WrappedJobException e) { - throw new ProgramInvocationException(e.getCause()); + throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e.getCause()); } catch (IOException | ClassNotFoundException e) { - throw new ProgramInvocationException(e); + throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index d81cacbb4f44d..fbc818701ca10 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -294,7 +294,7 @@ public JobWithJars getPlanWithoutJars() throws ProgramInvocationException { return new JobWithJars(getPlan(), Collections.emptyList(), classpaths, userCodeClassLoader); } else { throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + - " for a program that is using the interactive mode."); + " for a program that is using the interactive mode.", getPlan().getJobId()); } } @@ -309,7 +309,7 @@ public JobWithJars getPlanWithJars() throws ProgramInvocationException { return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader); } else { throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() + - " for a program that is using the interactive mode."); + " for a program that is using the interactive mode.", getPlan().getJobId()); } } @@ -341,12 +341,12 @@ else if (isUsingInteractiveMode()) { } catch (Throwable t) { // the invocation gets aborted with the preview plan - if (env.previewPlan != null) { - previewPlan = env.previewPlan; - } else if (env.preview != null) { - return env.preview; - } else { - throw new ProgramInvocationException("The program caused an error: ", t); + if (env.previewPlan == null) { + if (env.preview != null) { + return env.preview; + } else { + throw new ProgramInvocationException("The program caused an error: ", getPlan().getJobId(), t); + } } } finally { @@ -357,7 +357,8 @@ else if (isUsingInteractiveMode()) { previewPlan = env.previewPlan; } else { throw new ProgramInvocationException( - "The program plan could not be fetched. The program silently swallowed the control flow exceptions."); + "The program plan could not be fetched. The program silently swallowed the control flow exceptions.", + getPlan().getJobId()); } } else { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 5765d1ffa5a1e..94fc109c47be3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -90,7 +90,7 @@ public static JobGraph createJobGraph( try { jobGraph.addJar(new Path(url.toURI())); } catch (URISyntaxException e) { - throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', e); + throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java index ee58227fadf6e..ac4e3ce2b6962 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java @@ -18,6 +18,8 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.JobID; + /** * Exception used to indicate that there is an error during the invocation of a Flink program. */ @@ -37,6 +39,18 @@ public ProgramInvocationException(String message) { super(message); } + /** + * Creates a ProgramInvocationException with the given message which contains job id. + * + * @param message + * The additional message. + * @param jobID + * ID of failed job. + */ + public ProgramInvocationException(String message, JobID jobID) { + super(message + " (JobID: " + jobID + ")"); + } + /** * Creates a ProgramInvocationException for the given exception. * @@ -59,4 +73,19 @@ public ProgramInvocationException(Throwable cause) { public ProgramInvocationException(String message, Throwable cause) { super(message, cause); } + + /** + * Creates a ProgramInvocationException for the given exception with an + * additional message which contains job id. + * + * @param message + * The additional message. + * @param jobID + * ID of failed job. + * @param cause + * The exception that causes the program invocation to fail. + */ + public ProgramInvocationException(String message, JobID jobID, Throwable cause) { + super(message + " (JobID: " + jobID + ")", cause); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 7466e3254809e..8eb4ec0dab04b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -245,7 +245,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) try { return jobSubmissionFuture.get(); } catch (Exception e) { - throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() + '.', ExceptionUtils.stripExecutionException(e)); + throw new ProgramInvocationException("Could not submit job", + jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } } else { final CompletableFuture jobResultFuture = jobSubmissionFuture.thenCompose( @@ -255,16 +256,17 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) try { jobResult = jobResultFuture.get(); } catch (Exception e) { - throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e)); + throw new ProgramInvocationException("Could not retrieve the execution result.", + jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } try { this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobResult.WrappedJobException we) { - throw new ProgramInvocationException(we.getCause()); + throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), we.getCause()); } catch (IOException | ClassNotFoundException e) { - throw new ProgramInvocationException(e); + throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/legacy/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/legacy/JarActionHandler.java index b63188a3ad38f..e95c925c25730 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/legacy/JarActionHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/legacy/JarActionHandler.java @@ -98,7 +98,7 @@ protected Tuple2 getJobGraphAndClassLoader(JarActionHandl graph.addJar(new Path(jar.toURI())); } catch (URISyntaxException e) { - throw new ProgramInvocationException("Invalid jar path. Unexpected error. :("); + throw new ProgramInvocationException("Invalid jar path. Unexpected error. :(", graph.getJobID()); } } return Tuple2.of(graph, classLoader); diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java index 22b280f890e7d..1578b2ab4d20b 100644 --- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java @@ -83,7 +83,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List try { jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); } catch (MalformedURLException e) { - throw new ProgramInvocationException("Could not write the user code classes to disk.", e); + throw new ProgramInvocationException("Could not write the user code classes to disk.", + streamGraph.getJobGraph().getJobID(), e); } List allJarFiles = new ArrayList<>(jarFiles.size() + 1); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 075a3cd0caa2f..480f981bc75f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -210,7 +210,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List } } catch (Exception e) { - throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e); + throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), + streamGraph.getJobGraph().getJobID(), e); } client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); @@ -223,7 +224,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List } catch (Exception e) { String term = e.getMessage() == null ? "." : (": " + e.getMessage()); - throw new ProgramInvocationException("The program execution failed" + term, e); + throw new ProgramInvocationException("The program execution failed" + term, + streamGraph.getJobGraph().getJobID(), e); } finally { try {