From 45be6711b0b00ec851e32f255f2cbfe421762b05 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 7 Feb 2019 10:50:26 +0100 Subject: [PATCH] [FLINK-11545] [container] Pass job ID to ClassPathJobGraphRetriever --- .../ClassPathJobGraphRetriever.java | 20 +++++++++++++------ .../StandaloneJobClusterEntryPoint.java | 11 ++++++++-- .../ClassPathJobGraphRetrieverTest.java | 11 +++++++--- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 79e295f0ab574..5554168ca07f5 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; @@ -30,28 +31,35 @@ import javax.annotation.Nonnull; +import static java.util.Objects.requireNonNull; + /** * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class * on the class path. */ -public class ClassPathJobGraphRetriever implements JobGraphRetriever { +class ClassPathJobGraphRetriever implements JobGraphRetriever { @Nonnull private final String jobClassName; + @Nonnull + private final JobID jobId; + @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; @Nonnull private final String[] programArguments; - public ClassPathJobGraphRetriever( + ClassPathJobGraphRetriever( @Nonnull String jobClassName, + @Nonnull JobID jobId, @Nonnull SavepointRestoreSettings savepointRestoreSettings, @Nonnull String[] programArguments) { - this.jobClassName = jobClassName; - this.savepointRestoreSettings = savepointRestoreSettings; - this.programArguments = programArguments; + this.jobClassName = requireNonNull(jobClassName, "jobClassName"); + this.jobId = requireNonNull(jobId, "jobId"); + this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); + this.programArguments = requireNonNull(programArguments, "programArguments"); } @Override @@ -63,7 +71,7 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept packagedProgram, configuration, defaultParallelism, - StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID); + jobId); jobGraph.setAllowQueuedScheduling(true); jobGraph.setSavepointRestoreSettings(savepointRestoreSettings); diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index c2cb906709bf3..aaef7712749a5 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.FlinkParseException; @@ -44,19 +45,24 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { @Nonnull private final String jobClassName; + @Nonnull + private final JobID jobId; + @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; @Nonnull private final String[] programArguments; - StandaloneJobClusterEntryPoint( + private StandaloneJobClusterEntryPoint( Configuration configuration, @Nonnull String jobClassName, + @Nonnull JobID jobId, @Nonnull SavepointRestoreSettings savepointRestoreSettings, @Nonnull String[] programArguments) { super(configuration); this.jobClassName = requireNonNull(jobClassName, "jobClassName"); + this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); } @@ -65,7 +71,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, - new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments)); + new ClassPathJobGraphRetriever(jobClassName, jobId, savepointRestoreSettings, programArguments)); } public static void main(String[] args) { @@ -92,6 +98,7 @@ public static void main(String[] args) { StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( configuration, clusterConfiguration.getJobClassName(), + clusterConfiguration.getJobId(), clusterConfiguration.getSavepointRestoreSettings(), clusterConfiguration.getArgs()); diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java index 143db746e1a36..cd038b32d7f20 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -37,16 +38,18 @@ */ public class ClassPathJobGraphRetrieverTest extends TestLogger { - public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; + private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; @Test public void testJobGraphRetrieval() throws FlinkException { final int parallelism = 42; final Configuration configuration = new Configuration(); configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); + final JobID jobId = new JobID(); final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( TestJob.class.getCanonicalName(), + jobId, SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS); @@ -54,22 +57,24 @@ public void testJobGraphRetrieval() throws FlinkException { assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); - assertEquals(jobGraph.getJobID(), StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID); + assertEquals(jobGraph.getJobID(), jobId); } @Test public void testSavepointRestoreSettings() throws FlinkException { final Configuration configuration = new Configuration(); final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true); + final JobID jobId = new JobID(); final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( TestJob.class.getCanonicalName(), + jobId, savepointRestoreSettings, PROGRAM_ARGUMENTS); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings))); - assertEquals(jobGraph.getJobID(), StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID); + assertEquals(jobGraph.getJobID(), jobId); } }