Skip to content

Commit

Permalink
[FLINK-16658] Wire the ApplicationDispatcherBootstrap to the Standalo…
Browse files Browse the repository at this point in the history
…neJobClusterEntryPoint
  • Loading branch information
kl0u committed Apr 21, 2020
1 parent 52bcb21 commit 74fc5a7
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
Expand All @@ -29,14 +39,16 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URL;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
Expand All @@ -50,40 +62,23 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

@Nonnull
private final JobID jobId;

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
private final String[] programArguments;

@Nullable
private final String jobClassName;
private final PackagedProgram program;

private StandaloneJobClusterEntryPoint(
Configuration configuration,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
final Configuration configuration,
final PackagedProgram program) {
super(configuration);
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
this.program = requireNonNull(program);
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
final ClassPathJobGraphRetriever.Builder classPathJobGraphRetrieverBuilder = ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, programArguments)
.setJobClassName(jobClassName);
tryFindUserLibDirectory().ifPresent(classPathJobGraphRetrieverBuilder::setUserLibDirectory);

return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
classPathJobGraphRetrieverBuilder.build());
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory
.create(configuration, SessionDispatcherFactory.INSTANCE, program)),
StandaloneResourceManagerFactory.INSTANCE,
JobRestEndpointFactory.INSTANCE);
}

public static void main(String[] args) {
Expand All @@ -93,8 +88,8 @@ public static void main(String[] args) {
JvmShutdownSafeguard.installAsShutdownHook(LOG);

final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
StandaloneJobClusterConfiguration clusterConfiguration = null;

StandaloneJobClusterConfiguration clusterConfiguration = null;
try {
clusterConfiguration = commandLineParser.parse(args);
} catch (Exception e) {
Expand All @@ -103,43 +98,66 @@ public static void main(String[] args) {
System.exit(1);
}

Configuration configuration = loadConfiguration(clusterConfiguration);
setDefaultExecutionModeIfNotConfigured(configuration);
PackagedProgram program = null;
try {
program = getPackagedProgram(clusterConfiguration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}

Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);

StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
configuration,
resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());
StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, program);

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

@VisibleForTesting
@Nonnull
static Configuration loadConfigurationFromClusterConfig(StandaloneJobClusterConfiguration clusterConfiguration) {
Configuration configuration = loadConfiguration(clusterConfiguration);
setStaticJobId(clusterConfiguration, configuration);
SavepointRestoreSettings.toConfiguration(clusterConfiguration.getSavepointRestoreSettings(), configuration);
return configuration;
}

private static PackagedProgram getPackagedProgram(
final StandaloneJobClusterConfiguration clusterConfiguration) throws IOException, FlinkException {
final PackagedProgramRetriever programRetriever = getPackagedProgramRetriever(
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());
return programRetriever.getPackagedProgram();
}

private static PackagedProgramRetriever getPackagedProgramRetriever(
final String[] programArguments,
@Nullable final String jobClassName) throws IOException {
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
ClassPathPackagedProgramRetriever
.newBuilder(programArguments)
.setJobClassName(jobClassName);
tryFindUserLibDirectory().ifPresent(retrieverBuilder::setUserLibDirectory);
return retrieverBuilder.build();
}

private static void setStaticJobId(StandaloneJobClusterConfiguration clusterConfiguration, Configuration configuration) {
final JobID jobId = resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration);
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
}

@VisibleForTesting
static JobID resolveJobIdForCluster(Optional<JobID> optionalJobID, Configuration configuration) {
return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration));
}

@Nonnull
private static JobID createJobIdForCluster(Configuration globalConfiguration) {
if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) {
return ZERO_JOB_ID;
} else {
return JobID.generate();
}
}

@VisibleForTesting
static void setDefaultExecutionModeIfNotConfigured(Configuration configuration) {
if (isNoExecutionModeConfigured(configuration)) {
// In contrast to other places, the default for standalone job clusters is ExecutionMode.DETACHED
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
}
}

private static boolean isNoExecutionModeConfigured(Configuration configuration) {
return configuration.getString(ClusterEntrypoint.EXECUTION_MODE, null) == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -67,6 +71,57 @@ public void createEmptyFlinkConfiguration() throws IOException {
private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
private static final String JOB_CLASS_NAME = "foobar";

@Test
public void testEntrypointClusterConfigurationToConfigurationParsing() throws FlinkParseException {
final JobID jobID = JobID.generate();
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("/test/savepoint/path", true);
final String key = DeploymentOptions.TARGET.key();
final String value = "testDynamicExecutorConfig";
final int restPort = 1234;
final String arg1 = "arg1";
final String arg2 = "arg2";
final String[] args = {
"--configDir", confDirPath,
"--job-id", jobID.toHexString(),
"--fromSavepoint", savepointRestoreSettings.getRestorePath(),
"--allowNonRestoredState",
"--webui-port", String.valueOf(restPort),
"--job-classname", JOB_CLASS_NAME,
String.format("-D%s=%s", key, value),
arg1, arg2};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));

final Configuration configuration = StandaloneJobClusterEntryPoint
.loadConfigurationFromClusterConfig(clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(SavepointRestoreSettings.fromConfiguration(configuration), is(equalTo(savepointRestoreSettings)));

assertThat(configuration.get(RestOptions.PORT), is(equalTo(restPort)));
assertThat(configuration.get(DeploymentOptions.TARGET), is(equalTo(value)));
}

@Test
public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing() throws FlinkParseException {
final JobID jobID = JobID.generate();
final String[] args = {
"-c", confDirPath,
"--job-id", jobID.toHexString()
};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final Configuration configuration = StandaloneJobClusterEntryPoint
.loadConfigurationFromClusterConfig(clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(SavepointRestoreSettings.fromConfiguration(configuration), is(equalTo(SavepointRestoreSettings.none())));
}

@Test
public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
final String key = "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint.ExecutionMode;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Optional;

import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNot.not;
Expand All @@ -40,32 +37,6 @@
*/
public class StandaloneJobClusterEntryPointTest extends TestLogger {

/**
* Tests that the default {@link ExecutionMode} is {@link ExecutionMode#DETACHED}.
*/
@Test
public void testDefaultExecutionModeIsDetached() {
Configuration configuration = new Configuration();

StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration);

assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.DETACHED));
}

/**
* Tests that {@link ExecutionMode} is not overwritten if provided.
*/
@Test
public void testDontOverwriteExecutionMode() {
Configuration configuration = new Configuration();
setExecutionMode(configuration, ExecutionMode.NORMAL);

StandaloneJobClusterEntryPoint.setDefaultExecutionModeIfNotConfigured(configuration);

// Don't overwrite provided configuration
assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.NORMAL));
}

@Test
public void configuredJobIDTakesPrecedenceWithHA() {
Optional<JobID> jobID = Optional.of(JobID.generate());
Expand Down Expand Up @@ -120,14 +91,6 @@ public void jobIDdefaultsToRandomJobIDWithoutHA() {
assertThat(jobIdForCluster, is(not(ZERO_JOB_ID)));
}

private static void setExecutionMode(Configuration configuration, ExecutionMode executionMode) {
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
}

private static ExecutionMode getExecutionMode(Configuration configuration) {
return ExecutionMode.valueOf(configuration.getString(ClusterEntrypoint.EXECUTION_MODE));
}

private static void enableHighAvailability(final Configuration configuration) {
configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class DefaultDispatcherRunnerFactory implements DispatcherRunnerFactory {
private final DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory;

private DefaultDispatcherRunnerFactory(DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory) {
public DefaultDispatcherRunnerFactory(DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory) {
this.dispatcherLeaderProcessFactoryFactory = dispatcherLeaderProcessFactoryFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class DefaultDispatcherResourceManagerComponentFactory implements Dispatc
@Nonnull
private final RestEndpointFactory<?> restEndpointFactory;

DefaultDispatcherResourceManagerComponentFactory(
public DefaultDispatcherResourceManagerComponentFactory(
@Nonnull DispatcherRunnerFactory dispatcherRunnerFactory,
@Nonnull ResourceManagerFactory<?> resourceManagerFactory,
@Nonnull RestEndpointFactory<?> restEndpointFactory) {
Expand Down

0 comments on commit 74fc5a7

Please sign in to comment.