diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index f9ffa213885ea..bbbf3c5e074fa 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -28,6 +28,7 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -36,6 +37,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherBootstrap; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.util.SerializedThrowable; @@ -72,6 +74,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class); + public static final JobID ZERO_JOB_ID = new JobID(0, 0); + private final PackagedProgram application; private final Collection recoveredJobs; @@ -127,7 +131,7 @@ CompletableFuture runApplicationAndShutdownClusterAsync( final DispatcherGateway dispatcher, final ScheduledExecutor scheduledExecutor) { - applicationCompletionFuture = runApplicationAsync(dispatcher, scheduledExecutor, false); + applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor); return applicationCompletionFuture .handle((r, t) -> { @@ -144,13 +148,30 @@ CompletableFuture runApplicationAndShutdownClusterAsync( .thenCompose(Function.identity()); } + @VisibleForTesting + CompletableFuture fixJobIdAndRunApplicationAsync( + final DispatcherGateway dispatcher, + final ScheduledExecutor scheduledExecutor) { + + final Optional configuredJobId = + configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); + + if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && !configuredJobId.isPresent()) { + return runApplicationAsync(dispatcher, scheduledExecutor, false); + } + + if (!configuredJobId.isPresent()) { + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, ZERO_JOB_ID.toHexString()); + } + return runApplicationAsync(dispatcher, scheduledExecutor, true); + } + /** * Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}. * The returned {@link CompletableFuture} completes when all jobs of the user application * succeeded. if any of them fails, or if job submission fails. */ - @VisibleForTesting - CompletableFuture runApplicationAsync( + private CompletableFuture runApplicationAsync( final DispatcherGateway dispatcher, final ScheduledExecutor scheduledExecutor, final boolean enforceSingleJobExecution) { @@ -159,7 +180,7 @@ CompletableFuture runApplicationAsync( // we need to hand in a future as return value because we need to get those JobIs out // from the scheduled task that executes the user program applicationExecutionTask = scheduledExecutor.schedule( - () -> runApplicationEntrypoint( + () -> runApplicationEntryPoint( applicationExecutionFuture, dispatcher, scheduledExecutor, @@ -177,7 +198,7 @@ CompletableFuture runApplicationAsync( * *

This should be executed in a separate thread (or task). */ - private void runApplicationEntrypoint( + private void runApplicationEntryPoint( final CompletableFuture> jobIdsFuture, final DispatcherGateway dispatcher, final ScheduledExecutor scheduledExecutor, diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java index 96317c89c716a..cc52abd025123 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java @@ -26,11 +26,14 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; @@ -82,22 +85,123 @@ public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())); final CompletableFuture applicationFuture = - runApplication(dispatcherBuilder, 0, false); + runApplication(dispatcherBuilder, 0); assertException(applicationFuture, ApplicationExecutionException.class); } @Test - public void testExceptionThrownWithMultiJobApplicationIfOnlyOneJobIsAllowed() throws Throwable { - final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() - .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())); + public void testOnlyOneJobIsAllowedWithHa() throws Throwable { + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); final CompletableFuture applicationFuture = - runApplication(dispatcherBuilder, 3, true); + runApplication(configurationUnderTest, 2); assertException(applicationFuture, FlinkRuntimeException.class); } + @Test + public void testOnlyOneJobAllowedWithStaticJobId() throws Throwable { + final JobID testJobID = new JobID(0, 2); + + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + + final CompletableFuture applicationFuture = + runApplication(configurationUnderTest, 2); + + assertException(applicationFuture, FlinkRuntimeException.class); + } + + @Test + public void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable { + final JobID testJobID = new JobID(0, 2); + + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + + final CompletableFuture applicationFuture = + runApplication(configurationUnderTest, 2); + + assertException(applicationFuture, FlinkRuntimeException.class); + } + + @Test + public void testJobIdDefaultsToZeroWithHa() throws Throwable { + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + + final CompletableFuture submittedJobId = new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() + .setSubmitFunction(jobGraph -> { + submittedJobId.complete(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) + .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); + + final CompletableFuture applicationFuture = + runApplication(dispatcherBuilder, configurationUnderTest, 1); + + applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 0L))); + } + + @Test + public void testStaticJobId() throws Throwable { + final JobID testJobID = new JobID(0, 2); + + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + + final CompletableFuture submittedJobId = new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() + .setSubmitFunction(jobGraph -> { + submittedJobId.complete(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) + .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); + + final CompletableFuture applicationFuture = + runApplication(dispatcherBuilder, configurationUnderTest, 1); + + applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 2L))); + } + + @Test + public void testStaticJobIdWithHa() throws Throwable { + final JobID testJobID = new JobID(0, 2); + + final Configuration configurationUnderTest = getConfiguration(); + configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString()); + configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + + final CompletableFuture submittedJobId = new CompletableFuture<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() + .setSubmitFunction(jobGraph -> { + submittedJobId.complete(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) + .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); + + final CompletableFuture applicationFuture = + runApplication(dispatcherBuilder, configurationUnderTest, 1); + + applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 2L))); + } + @Test public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable { final ConcurrentLinkedDeque submittedJobIds = new ConcurrentLinkedDeque<>(); @@ -126,7 +230,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable { return new CompletableFuture<>(); }); - final CompletableFuture applicationFuture = runApplication(dispatcherBuilder, 2, false); + final CompletableFuture applicationFuture = runApplication(dispatcherBuilder, 2); assertException(applicationFuture, JobExecutionException.class); } @@ -138,7 +242,7 @@ public void testApplicationSucceedsWhenAllJobsSucceed() throws Exception { .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); - final CompletableFuture applicationFuture = runApplication(dispatcherBuilder, 3, false); + final CompletableFuture applicationFuture = runApplication(dispatcherBuilder, 3); // this would block indefinitely if the applications don't finish applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -300,19 +404,37 @@ public void testClusterShutdownWhenApplicationFails() throws Exception { private CompletableFuture runApplication( TestingDispatcherGateway.Builder dispatcherBuilder, - int noOfJobs, - boolean enforceSingleJobExecution) throws FlinkException { + int noOfJobs) throws FlinkException { + + return runApplication(dispatcherBuilder, getConfiguration(), noOfJobs); + } + + private CompletableFuture runApplication( + final Configuration configuration, + final int noOfJobs) throws Throwable { + + final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() + .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) + .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) + .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); + + return runApplication(dispatcherBuilder, configuration, noOfJobs); + } + + private CompletableFuture runApplication( + TestingDispatcherGateway.Builder dispatcherBuilder, + Configuration configuration, + int noOfJobs) throws FlinkException { final PackagedProgram program = getProgram(noOfJobs); final ApplicationDispatcherBootstrap bootstrap = new ApplicationDispatcherBootstrap( - program, Collections.emptyList(), getConfiguration()); + program, Collections.emptyList(), configuration); - return bootstrap.runApplicationAsync( + return bootstrap.fixJobIdAndRunApplicationAsync( dispatcherBuilder.build(), - scheduledExecutor, - enforceSingleJobExecution); + scheduledExecutor); } private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException { diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 43ac61318efa6..ca10d57d7ffbb 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -45,7 +44,6 @@ import java.io.IOException; import java.net.URL; -import java.util.Optional; import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory; @@ -56,8 +54,6 @@ @Internal public final class StandaloneJobClusterEntryPoint extends ApplicationClusterEntryPoint { - public static final JobID ZERO_JOB_ID = new JobID(0, 0); - private StandaloneJobClusterEntryPoint( final Configuration configuration, final PackagedProgram program) { @@ -127,20 +123,9 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( } private static void setStaticJobId(StandaloneJobClusterConfiguration clusterConfiguration, Configuration configuration) { - final JobID jobId = resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration); - configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); - } - - @VisibleForTesting - static JobID resolveJobIdForCluster(Optional optionalJobID, Configuration configuration) { - return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration)); - } - - private static JobID createJobIdForCluster(Configuration globalConfiguration) { - if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) { - return ZERO_JOB_ID; - } else { - return JobID.generate(); + final JobID jobId = clusterConfiguration.getJobId(); + if (jobId != null) { + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); } } } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java deleted file mode 100644 index 140ff28ac0489..0000000000000 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.container.entrypoint; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.Optional; - -import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNot.not; - -/** - * Tests for the {@link StandaloneJobClusterEntryPoint}. - */ -public class StandaloneJobClusterEntryPointTest extends TestLogger { - - @Test - public void configuredJobIDTakesPrecedenceWithHA() { - Optional jobID = Optional.of(JobID.generate()); - - Configuration globalConfiguration = new Configuration(); - enableHighAvailability(globalConfiguration); - - JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, - globalConfiguration); - - assertThat(jobIdForCluster, is(jobID.get())); - } - - @Test - public void configuredJobIDTakesPrecedenceWithoutHA() { - Optional jobID = Optional.of(JobID.generate()); - - Configuration globalConfiguration = new Configuration(); - - JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, - globalConfiguration); - - assertThat(jobIdForCluster, is(jobID.get())); - } - - @Test - public void jobIDdefaultsToZeroWithHA() { - Optional jobID = Optional.empty(); - - Configuration globalConfiguration = new Configuration(); - enableHighAvailability(globalConfiguration); - - JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, - globalConfiguration); - - assertThat(jobIdForCluster, is(ZERO_JOB_ID)); - } - - @Test - public void jobIDdefaultsToRandomJobIDWithoutHA() { - Optional jobID = Optional.empty(); - - Configuration globalConfiguration = new Configuration(); - - JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, - globalConfiguration); - - assertThat(jobIdForCluster, is(not(ZERO_JOB_ID))); - } - - private static void enableHighAvailability(final Configuration configuration) { - configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - } -} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java index 73f889ad2ac11..8dec082c2dce6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java @@ -19,7 +19,6 @@ package org.apache.flink.yarn.entrypoint; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; @@ -30,9 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; @@ -59,8 +56,6 @@ @Internal public final class YarnApplicationClusterEntryPoint extends ApplicationClusterEntryPoint { - public static final JobID ZERO_JOB_ID = new JobID(0, 0); - private YarnApplicationClusterEntryPoint( final Configuration configuration, final PackagedProgram program) { @@ -97,9 +92,6 @@ public static void main(final String[] args) { System.exit(1); } - final JobID jobId = createJobIdForCluster(configuration); - configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); - configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); @@ -147,12 +139,4 @@ private static Optional getUsrLibDir(final Configuration configuration) { return userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? userLibDir : Optional.empty(); } - - private static JobID createJobIdForCluster(Configuration globalConfiguration) { - if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) { - return ZERO_JOB_ID; - } else { - return JobID.generate(); - } - } }