Skip to content

Commit

Permalink
[FLINK-10164] Add support for resuming from a savepoint to Standalone…
Browse files Browse the repository at this point in the history
…JobClusterEntrypoint

The StandaloneJobClusterEntrypoint accepts now CLI options to specify a savepoint path and
whether to allow non restored state or not. If the entrypoint is started with a savepoint
path, then the job will try to resume from this savepoint.

This closes apache#6572.
  • Loading branch information
tillrohrmann committed Aug 22, 2018
1 parent 3fd6587 commit d593136
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.client.cli;

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
Expand Down Expand Up @@ -76,10 +77,10 @@ public class CliFrontendParser {
"Address of the JobManager (master) to which to connect. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration.");

static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
public static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to restore the job from (for example hdfs:https:///flink/savepoint-1537).");

static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
"Allow to skip savepoint state that cannot be restored. " +
"You need to allow this if you removed an operator from your " +
"program that was part of the program when the savepoint was triggered.");
Expand Down Expand Up @@ -401,6 +402,16 @@ private static void printCustomCliOptions(
}
}

public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
} else {
return SavepointRestoreSettings.none();
}
}

// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;

/**
Expand Down Expand Up @@ -116,13 +114,7 @@ else if (args.length > 0) {
detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
YARN_DETACHED_OPTION.getOpt());

if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
} else {
this.savepointSettings = SavepointRestoreSettings.none();
}
this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
}

public String getJarFilePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.container.entrypoint;

import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import javax.annotation.Nonnull;

Expand All @@ -28,16 +29,26 @@
* Configuration for the {@link StandaloneJobClusterEntryPoint}.
*/
final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {

@Nonnull
private final String jobClassName;

public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) {
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
super(configDir, dynamicProperties, args, restPort);
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
}

@Nonnull
String getJobClassName() {
return jobClassName;
}

@Nonnull
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointRestoreSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
Expand Down Expand Up @@ -53,6 +55,8 @@ public Options getOptions() {
options.addOption(REST_PORT_OPTION);
options.addOption(JOB_CLASS_NAME_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);

return options;
}
Expand All @@ -64,12 +68,14 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortString);
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);

return new StandaloneJobClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
restPort,
jobClassName);
jobClassName,
savepointRestoreSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
Expand Down Expand Up @@ -63,10 +64,18 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
@Nonnull
private final String jobClassName;

StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) {
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

StandaloneJobClusterEntryPoint(
Configuration configuration,
@Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
super(configuration);
this.programArguments = checkNotNull(programArguments);
this.jobClassName = checkNotNull(jobClassName);
this.savepointRestoreSettings = savepointRestoreSettings;
}

@Override
Expand All @@ -76,6 +85,7 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
try {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

return jobGraph;
} catch (Exception e) {
Expand Down Expand Up @@ -148,8 +158,10 @@ public static void main(String[] args) {

configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());

StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration,
StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
configuration,
clusterConfiguration.getJobClassName(),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs());

entrypoint.startCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand All @@ -38,28 +39,30 @@
public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {

private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
private static final String JOB_CLASS_NAME = "foobar";
private static final String CONFIG_DIR = "/foo/bar";

@Test
public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
final String configDir = "/foo/bar";
final String key = "key";
final String value = "value";
final int restPort = 1234;
final String jobClassName = "foobar";
final String arg1 = "arg1";
final String arg2 = "arg2";
final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2};
final String[] args = {"--configDir", CONFIG_DIR, "--webui-port", String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getConfigDir(), is(equalTo(CONFIG_DIR)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();

assertThat(dynamicProperties, hasEntry(key, value));

assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));

assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
}

@Test
Expand All @@ -81,4 +84,17 @@ public void testMissingRequiredArgument() throws FlinkParseException {

commandLineParser.parse(args);
}

@Test
public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
final String restorePath = "foobar";
final String[] args = {"-c", CONFIG_DIR, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);

final SavepointRestoreSettings savepointRestoreSettings = standaloneJobClusterConfiguration.getSavepointRestoreSettings();

assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

Expand All @@ -35,6 +36,8 @@
*/
public class StandaloneJobClusterEntryPointTest extends TestLogger {

public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};

@Test
public void testJobGraphRetrieval() throws FlinkException {
final Configuration configuration = new Configuration();
Expand All @@ -43,12 +46,27 @@ public void testJobGraphRetrieval() throws FlinkException {
final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
configuration,
TestJob.class.getCanonicalName(),
new String[] {"--arg", "suffix"});
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
}

@Test
public void testSavepointRestoreSettings() throws FlinkException {
final Configuration configuration = new Configuration();
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
configuration,
TestJob.class.getCanonicalName(),
savepointRestoreSettings,
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);

assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
}
}

0 comments on commit d593136

Please sign in to comment.