diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index b5be5c4151237..b89a7c43aac7f 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -20,7 +20,17 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramRetriever; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; @@ -29,14 +39,16 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.FlinkException; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; +import java.net.URL; import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -50,40 +62,23 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { public static final JobID ZERO_JOB_ID = new JobID(0, 0); - @Nonnull - private final JobID jobId; - - @Nonnull - private final SavepointRestoreSettings savepointRestoreSettings; - - @Nonnull - private final String[] programArguments; - - @Nullable - private final String jobClassName; + private final PackagedProgram program; private StandaloneJobClusterEntryPoint( - Configuration configuration, - @Nonnull JobID jobId, - @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments, - @Nullable String jobClassName) { + final Configuration configuration, + final PackagedProgram program) { super(configuration); - this.jobId = requireNonNull(jobId, "jobId"); - this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); - this.programArguments = requireNonNull(programArguments, "programArguments"); - this.jobClassName = jobClassName; + this.program = requireNonNull(program); } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { - final ClassPathJobGraphRetriever.Builder classPathJobGraphRetrieverBuilder = ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, programArguments) - .setJobClassName(jobClassName); - tryFindUserLibDirectory().ifPresent(classPathJobGraphRetrieverBuilder::setUserLibDirectory); - - return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( - StandaloneResourceManagerFactory.INSTANCE, - classPathJobGraphRetrieverBuilder.build()); + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + ApplicationDispatcherLeaderProcessFactoryFactory + .create(configuration, SessionDispatcherFactory.INSTANCE, program)), + StandaloneResourceManagerFactory.INSTANCE, + JobRestEndpointFactory.INSTANCE); } public static void main(String[] args) { @@ -93,8 +88,8 @@ public static void main(String[] args) { JvmShutdownSafeguard.installAsShutdownHook(LOG); final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); - StandaloneJobClusterConfiguration clusterConfiguration = null; + StandaloneJobClusterConfiguration clusterConfiguration = null; try { clusterConfiguration = commandLineParser.parse(args); } catch (Exception e) { @@ -103,26 +98,61 @@ public static void main(String[] args) { System.exit(1); } - Configuration configuration = loadConfiguration(clusterConfiguration); - setDefaultExecutionModeIfNotConfigured(configuration); + PackagedProgram program = null; + try { + program = getPackagedProgram(clusterConfiguration); + } catch (Exception e) { + LOG.error("Could not create application program.", e); + System.exit(1); + } + + Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration); + configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); - StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( - configuration, - resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration), - clusterConfiguration.getSavepointRestoreSettings(), - clusterConfiguration.getArgs(), - clusterConfiguration.getJobClassName()); + StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, program); ClusterEntrypoint.runClusterEntrypoint(entrypoint); } @VisibleForTesting - @Nonnull + static Configuration loadConfigurationFromClusterConfig(StandaloneJobClusterConfiguration clusterConfiguration) { + Configuration configuration = loadConfiguration(clusterConfiguration); + setStaticJobId(clusterConfiguration, configuration); + SavepointRestoreSettings.toConfiguration(clusterConfiguration.getSavepointRestoreSettings(), configuration); + return configuration; + } + + private static PackagedProgram getPackagedProgram( + final StandaloneJobClusterConfiguration clusterConfiguration) throws IOException, FlinkException { + final PackagedProgramRetriever programRetriever = getPackagedProgramRetriever( + clusterConfiguration.getArgs(), + clusterConfiguration.getJobClassName()); + return programRetriever.getPackagedProgram(); + } + + private static PackagedProgramRetriever getPackagedProgramRetriever( + final String[] programArguments, + @Nullable final String jobClassName) throws IOException { + final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = + ClassPathPackagedProgramRetriever + .newBuilder(programArguments) + .setJobClassName(jobClassName); + tryFindUserLibDirectory().ifPresent(retrieverBuilder::setUserLibDirectory); + return retrieverBuilder.build(); + } + + private static void setStaticJobId(StandaloneJobClusterConfiguration clusterConfiguration, Configuration configuration) { + final JobID jobId = resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration); + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + } + + @VisibleForTesting static JobID resolveJobIdForCluster(Optional optionalJobID, Configuration configuration) { return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration)); } - @Nonnull private static JobID createJobIdForCluster(Configuration globalConfiguration) { if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) { return ZERO_JOB_ID; @@ -130,16 +160,4 @@ private static JobID createJobIdForCluster(Configuration globalConfiguration) { return JobID.generate(); } } - - @VisibleForTesting - static void setDefaultExecutionModeIfNotConfigured(Configuration configuration) { - if (isNoExecutionModeConfigured(configuration)) { - // In contrast to other places, the default for standalone job clusters is ExecutionMode.DETACHED - configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString()); - } - } - - private static boolean isNoExecutionModeConfigured(Configuration configuration) { - return configuration.getString(ClusterEntrypoint.EXECUTION_MODE, null) == null; - } } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java index 4a72d3a25e926..e2a74736a2165 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -19,7 +19,11 @@ package org.apache.flink.container.entrypoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.entrypoint.FlinkParseException; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -67,6 +71,57 @@ public void createEmptyFlinkConfiguration() throws IOException { private static final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); private static final String JOB_CLASS_NAME = "foobar"; + @Test + public void testEntrypointClusterConfigurationToConfigurationParsing() throws FlinkParseException { + final JobID jobID = JobID.generate(); + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("/test/savepoint/path", true); + final String key = DeploymentOptions.TARGET.key(); + final String value = "testDynamicExecutorConfig"; + final int restPort = 1234; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = { + "--configDir", confDirPath, + "--job-id", jobID.toHexString(), + "--fromSavepoint", savepointRestoreSettings.getRestorePath(), + "--allowNonRestoredState", + "--webui-port", String.valueOf(restPort), + "--job-classname", JOB_CLASS_NAME, + String.format("-D%s=%s", key, value), + arg1, arg2}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME))); + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + + final Configuration configuration = StandaloneJobClusterEntryPoint + .loadConfigurationFromClusterConfig(clusterConfiguration); + + final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); + assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID))); + assertThat(SavepointRestoreSettings.fromConfiguration(configuration), is(equalTo(savepointRestoreSettings))); + + assertThat(configuration.get(RestOptions.PORT), is(equalTo(restPort))); + assertThat(configuration.get(DeploymentOptions.TARGET), is(equalTo(value))); + } + + @Test + public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing() throws FlinkParseException { + final JobID jobID = JobID.generate(); + final String[] args = { + "-c", confDirPath, + "--job-id", jobID.toHexString() + }; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + final Configuration configuration = StandaloneJobClusterEntryPoint + .loadConfigurationFromClusterConfig(clusterConfiguration); + + final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); + assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID))); + assertThat(SavepointRestoreSettings.fromConfiguration(configuration), is(equalTo(SavepointRestoreSettings.none()))); + } + @Test public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { final String key = "key"; diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java index 26fb798e8bc46..140ff28ac0489 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -21,8 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.ClusterEntrypoint.ExecutionMode; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -30,7 +28,6 @@ import java.util.Optional; import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsNot.not; @@ -40,32 +37,6 @@ */ public class StandaloneJobClusterEntryPointTest extends TestLogger { - /** - * Tests that the default {@link ExecutionMode} is {@link ExecutionMode#DETACHED}. - */ - @Test - public void testDefaultExecutionModeIsDetached() { - Configuration configuration = new Configuration(); - - StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration); - - assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.DETACHED)); - } - - /** - * Tests that {@link ExecutionMode} is not overwritten if provided. - */ - @Test - public void testDontOverwriteExecutionMode() { - Configuration configuration = new Configuration(); - setExecutionMode(configuration, ExecutionMode.NORMAL); - - StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration); - - // Don't overwrite provided configuration - assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.NORMAL)); - } - @Test public void configuredJobIDTakesPrecedenceWithHA() { Optional jobID = Optional.of(JobID.generate()); @@ -120,14 +91,6 @@ public void jobIDdefaultsToRandomJobIDWithoutHA() { assertThat(jobIdForCluster, is(not(ZERO_JOB_ID))); } - private static void setExecutionMode(Configuration configuration, ExecutionMode executionMode) { - configuration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); - } - - private static ExecutionMode getExecutionMode(Configuration configuration) { - return ExecutionMode.valueOf(configuration.getString(ClusterEntrypoint.EXECUTION_MODE)); - } - private static void enableHighAvailability(final Configuration configuration) { configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java index 84ba3f49f5708..d08b9d48fe8da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java @@ -35,7 +35,7 @@ public class DefaultDispatcherRunnerFactory implements DispatcherRunnerFactory { private final DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory; - private DefaultDispatcherRunnerFactory(DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory) { + public DefaultDispatcherRunnerFactory(DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory) { this.dispatcherLeaderProcessFactoryFactory = dispatcherLeaderProcessFactoryFactory; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index 55ecc96e227c1..e1d1296a789a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -89,7 +89,7 @@ public class DefaultDispatcherResourceManagerComponentFactory implements Dispatc @Nonnull private final RestEndpointFactory restEndpointFactory; - DefaultDispatcherResourceManagerComponentFactory( + public DefaultDispatcherResourceManagerComponentFactory( @Nonnull DispatcherRunnerFactory dispatcherRunnerFactory, @Nonnull ResourceManagerFactory resourceManagerFactory, @Nonnull RestEndpointFactory restEndpointFactory) {