diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 94fc109c47be3..59ab40658048d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -39,18 +40,21 @@ public class PackagedProgramUtils { /** - * Creates a {@link JobGraph} from the given {@link PackagedProgram}. + * Creates a {@link JobGraph} with a specified {@link JobID} + * from the given {@link PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph + * @param jobID the pre-generated job id * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, - int defaultParallelism) throws ProgramInvocationException { + int defaultParallelism, + JobID jobID) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); final FlinkPlan flinkPlan; @@ -79,11 +83,11 @@ public static JobGraph createJobGraph( final JobGraph jobGraph; if (flinkPlan instanceof StreamingPlan) { - jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); } else { final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration); - jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan); + jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID); } for (URL url : packagedProgram.getAllLibraries()) { @@ -99,5 +103,22 @@ public static JobGraph createJobGraph( return jobGraph; } + /** + * Creates a {@link JobGraph} with a random {@link JobID} + * from the given {@link PackagedProgram}. + * + * @param packagedProgram to extract the JobGraph from + * @param configuration to use for the optimizer and job graph generator + * @param defaultParallelism for the JobGraph + * @return JobGraph extracted from the PackagedProgram + * @throws ProgramInvocationException if the JobGraph generation failed + */ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism) throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null); + } + private PackagedProgramUtils() {} } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 3e0645d685965..d769b6848e7c1 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; @@ -45,6 +46,8 @@ public class ClassPathJobGraphRetriever implements JobGraphRetriever { @Nonnull private final String[] programArguments; + public static final JobID FIXED_JOB_ID = new JobID(0, 0); + public ClassPathJobGraphRetriever( @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings, @@ -59,7 +62,11 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept final PackagedProgram packagedProgram = createPackagedProgram(); final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); try { - final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism); + final JobGraph jobGraph = PackagedProgramUtils.createJobGraph( + packagedProgram, + configuration, + defaultParallelism, + FIXED_JOB_ID); jobGraph.setAllowQueuedScheduling(true); jobGraph.setSavepointRestoreSettings(savepointRestoreSettings); diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java index 6e460e1f894db..83696e37c9044 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java @@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -53,6 +54,7 @@ public void testJobGraphRetrieval() throws FlinkException { assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); + assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID); } @Test @@ -68,5 +70,6 @@ public void testSavepointRestoreSettings() throws FlinkException { final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings))); + assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID); } } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java index 880f2e3d5db41..764134f391e62 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java @@ -21,15 +21,29 @@ import java.io.File; import java.io.IOException; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; +import javax.annotation.Nullable; + /** * Abstract class representing Flink Streaming plans * */ public abstract class StreamingPlan implements FlinkPlan { - public abstract JobGraph getJobGraph(); + /** + * Gets the assembled {@link JobGraph} with a random {@link JobID}. + */ + @SuppressWarnings("deprecation") + public JobGraph getJobGraph() { + return getJobGraph(null); + } + + /** + * Gets the assembled {@link JobGraph} with a specified {@link JobID}. + */ + public abstract JobGraph getJobGraph(@Nullable JobID jobID); public abstract String getStreamingPlanAsJSON(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 01768ad6df7f8..46a4ce2211217 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -652,19 +653,20 @@ private void removeVertex(StreamNode toRemove) { } /** - * Gets the assembled {@link JobGraph}. + * Gets the assembled {@link JobGraph} with a given job id. */ @SuppressWarnings("deprecation") - public JobGraph getJobGraph() { + @Override + public JobGraph getJobGraph(@Nullable JobID jobID) { // temporarily forbid checkpointing for iterative jobs if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) { throw new UnsupportedOperationException( - "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " - + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " - + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); + "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } - return StreamingJobGraphGenerator.createJobGraph(this); + return StreamingJobGraphGenerator.createJobGraph(this, jobID); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 0ce522509209a..69213024975e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; @@ -62,6 +63,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -83,7 +86,11 @@ public class StreamingJobGraphGenerator { // ------------------------------------------------------------------------ public static JobGraph createJobGraph(StreamGraph streamGraph) { - return new StreamingJobGraphGenerator(streamGraph).createJobGraph(); + return createJobGraph(streamGraph, null); + } + + public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { + return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); } // ------------------------------------------------------------------------ @@ -108,6 +115,10 @@ public static JobGraph createJobGraph(StreamGraph streamGraph) { private final List legacyStreamGraphHashers; private StreamingJobGraphGenerator(StreamGraph streamGraph) { + this(streamGraph, null); + } + + private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); @@ -121,7 +132,7 @@ private StreamingJobGraphGenerator(StreamGraph streamGraph) { this.chainedPreferredResources = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); - jobGraph = new JobGraph(streamGraph.getJobName()); + jobGraph = new JobGraph(jobID, streamGraph.getJobName()); } private JobGraph createJobGraph() {