diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java index 875a7c5632a0b..36cd17e43e722 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -37,7 +37,7 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; - @Nonnull + @Nullable private final JobID jobId; @Nullable @@ -50,11 +50,11 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura @Nullable String hostname, int restPort, @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull JobID jobId, + @Nullable JobID jobId, @Nullable String jobClassName) { super(configDir, dynamicProperties, args, hostname, restPort); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); - this.jobId = requireNonNull(jobId, "jobId"); + this.jobId = jobId; this.jobClassName = jobClassName; } @@ -63,7 +63,7 @@ SavepointRestoreSettings getSavepointRestoreSettings() { return savepointRestoreSettings; } - @Nonnull + @Nullable JobID getJobId() { return jobId; } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java index 16fa63d08aff5..a99e277d35ca8 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -29,6 +29,7 @@ import org.apache.commons.cli.Options; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Properties; @@ -43,8 +44,6 @@ */ public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory { - static final JobID DEFAULT_JOB_ID = new JobID(0, 0); - private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") .longOpt("job-classname") .required(false) @@ -105,10 +104,11 @@ private int getRestPort(CommandLine commandLine) throws FlinkParseException { } } + @Nullable private static JobID getJobId(CommandLine commandLine) throws FlinkParseException { String jobId = commandLine.getOptionValue(JOB_ID_OPTION.getOpt()); if (jobId == null) { - return DEFAULT_JOB_ID; + return null; } try { return JobID.fromHexString(jobId); diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index c69325d1cdf3f..6001ec6fdb712 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -35,6 +36,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Optional; + import static java.util.Objects.requireNonNull; /** @@ -43,6 +46,8 @@ */ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { + public static final JobID ZERO_JOB_ID = new JobID(0, 0); + @Nonnull private final JobID jobId; @@ -97,7 +102,7 @@ public static void main(String[] args) { StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( configuration, - clusterConfiguration.getJobId(), + resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration), clusterConfiguration.getSavepointRestoreSettings(), clusterConfiguration.getArgs(), clusterConfiguration.getJobClassName()); @@ -105,6 +110,21 @@ public static void main(String[] args) { ClusterEntrypoint.runClusterEntrypoint(entrypoint); } + @VisibleForTesting + @Nonnull + static JobID resolveJobIdForCluster(Optional optionalJobID, Configuration configuration) { + return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration)); + } + + @Nonnull + private static JobID createJobIdForCluster(Configuration globalConfiguration) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) { + return ZERO_JOB_ID; + } else { + return JobID.generate(); + } + } + @VisibleForTesting static void setDefaultExecutionModeIfNotConfigured(Configuration configuration) { if (isNoExecutionModeConfigured(configuration)) { diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java index f74fc8388db27..4a72d3a25e926 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -36,13 +36,11 @@ import java.util.Optional; import java.util.Properties; -import static org.apache.flink.container.entrypoint.StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -91,7 +89,7 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none()))); - assertThat(clusterConfiguration.getJobId(), is(equalTo(DEFAULT_JOB_ID))); + assertThat(clusterConfiguration.getJobId(), is(nullValue())); } @Test @@ -106,7 +104,7 @@ public void testOnlyRequiredArguments() throws FlinkParseException { assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); assertThat(clusterConfiguration.getHostname(), is(nullValue())); assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none()))); - assertThat(clusterConfiguration.getJobId(), is(not(nullValue()))); + assertThat(clusterConfiguration.getJobId(), is(nullValue())); assertThat(clusterConfiguration.getJobClassName(), is(nullValue())); } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java index 61d37ac59d0fd..26fb798e8bc46 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -18,15 +18,22 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint.ExecutionMode; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.Optional; + +import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNot.not; /** * Tests for the {@link StandaloneJobClusterEntryPoint}. @@ -59,6 +66,60 @@ public void testDontOverwriteExecutionMode() { assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.NORMAL)); } + @Test + public void configuredJobIDTakesPrecedenceWithHA() { + Optional jobID = Optional.of(JobID.generate()); + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobID.get())); + } + + @Test + public void configuredJobIDTakesPrecedenceWithoutHA() { + Optional jobID = Optional.of(JobID.generate()); + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobID.get())); + } + + @Test + public void jobIDdefaultsToZeroWithHA() { + Optional jobID = Optional.empty(); + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(ZERO_JOB_ID)); + } + + @Test + public void jobIDdefaultsToRandomJobIDWithoutHA() { + Optional jobID = Optional.empty(); + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(not(ZERO_JOB_ID))); + } + private static void setExecutionMode(Configuration configuration, ExecutionMode executionMode) { configuration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); } @@ -66,4 +127,8 @@ private static void setExecutionMode(Configuration configuration, ExecutionMode private static ExecutionMode getExecutionMode(Configuration configuration) { return ExecutionMode.valueOf(configuration.getString(ClusterEntrypoint.EXECUTION_MODE)); } + + private static void enableHighAvailability(final Configuration configuration) { + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + } }