Skip to content

Commit

Permalink
[FLINK-2976] [clients] Add savepoint commands to CliFrontend
Browse files Browse the repository at this point in the history
[comments] Use handleError(Throwable)
  • Loading branch information
uce committed Jan 11, 2016
1 parent d739ee2 commit 3607575
Show file tree
Hide file tree
Showing 12 changed files with 689 additions and 62 deletions.
140 changes: 139 additions & 1 deletion flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class CliFrontend {
public static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_SAVEPOINT = "savepoint";

// config dir parameters
private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
Expand Down Expand Up @@ -301,6 +302,8 @@ protected int run(String[] args) {
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());

LOG.debug("Savepoint path is set to {}", options.getSavepointPath());

try {
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
Expand Down Expand Up @@ -630,6 +633,135 @@ protected int cancel(String[] args) {
}
}

/**
* Executes the SAVEPOINT action.
*
* @param args Command line arguments for the cancel action.
*/
protected int savepoint(String[] args) {
LOG.info("Running 'savepoint' command.");

SavepointOptions options;
try {
options = CliFrontendParser.parseSavepointCommand(args);
}
catch (CliArgsException e) {
return handleArgException(e);
}
catch (Throwable t) {
return handleError(t);
}

// evaluate help flag
if (options.isPrintHelp()) {
CliFrontendParser.printHelpForCancel();
return 0;
}

if (options.isDispose()) {
// Discard
return disposeSavepoint(options, options.getDisposeSavepointPath());
}
else {
// Trigger
String[] cleanedArgs = options.getArgs();
JobID jobId;

if (cleanedArgs.length > 0) {
String jobIdString = cleanedArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
}
catch (Exception e) {
return handleError(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID."));
}
}
else {
return handleError(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID. " +
"Specify a Job ID to trigger a savepoint."));
}

return triggerSavepoint(options, jobId);
}
}

/**
* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
* message to the job manager.
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId) {
try {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId), askTimeout);

Object result;
try {
logAndSysout("Triggering savepoint for job " + jobId + ". Waiting for response...");
result = Await.result(response, askTimeout);
}
catch (Exception e) {
throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
}

if (result instanceof TriggerSavepointSuccess) {
TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
logAndSysout("Savepoint completed. Path: " + success.savepointPath());
logAndSysout("You can resume your program from this savepoint with the run command.");

return 0;
}
else if (result instanceof TriggerSavepointFailure) {
TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
throw failure.cause();
}
else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
}
}
catch (Throwable t) {
return handleError(t);
}
}

/**
* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
* message to the job manager.
*/
private int disposeSavepoint(SavepointOptions options, String savepointPath) {
try {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout);

Object result;
try {
logAndSysout("Disposing savepoint '" + savepointPath + "'. Waiting for response...");
result = Await.result(response, askTimeout);
}
catch (Exception e) {
throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
}

if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
logAndSysout("Savepoint '" + savepointPath + "' disposed.");
return 0;
}
else if (result instanceof DisposeSavepointFailure) {
DisposeSavepointFailure failure = (DisposeSavepointFailure) result;
throw failure.cause();
}
else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
}
}
catch (Throwable t) {
return handleError(t);
}
}

// --------------------------------------------------------------------------------------------
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -719,9 +851,13 @@ else if (!jarFile.isFile()) {
// Get assembler class
String entryPointClass = options.getEntryPointClassName();

return entryPointClass == null ?
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

program.setSavepointPath(options.getSavepointPath());

return program;
}

/**
Expand Down Expand Up @@ -993,6 +1129,8 @@ public Integer run() throws Exception {
return info(params);
case ACTION_CANCEL:
return cancel(params);
case ACTION_SAVEPOINT:
return savepoint(params)
case "-h":
case "--help":
CliFrontendParser.printHelp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class CliFrontendParser {
+ "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
"different JobManager than the one specified in the configuration.");

static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to reset the job back to (for example file:https:///flink/savepoint-1537).");

static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
"Disposes an existing savepoint.");

// list specific options
static final Option RUNNING_OPTION = new Option("r", "running", false,
"Show only running programs and their JobIDs");
Expand Down Expand Up @@ -105,13 +111,19 @@ public class CliFrontendParser {

RUNNING_OPTION.setRequired(false);
SCHEDULED_OPTION.setRequired(false);

SAVEPOINT_PATH_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");

SAVEPOINT_DISPOSE_OPTION.setRequired(false);
SAVEPOINT_DISPOSE_OPTION.setArgName("savepointPath");
}

private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));

private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options()));

private static Options buildGeneralOptions(Options options) {
options.addOption(HELP_OPTION);
Expand All @@ -128,6 +140,7 @@ public static Options getProgramSpecificOptions(Options options) {
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);

// also add the YARN options so that the parser can parse them
yarnSessionCLi.getYARNSessionCLIOptions(options);
Expand All @@ -140,6 +153,7 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
return options;
}

Expand Down Expand Up @@ -183,6 +197,12 @@ private static Options getCancelOptions(Options options) {
return options;
}

private static Options getSavepointOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return options;
}

// --------------------------------------------------------------------------------------------
// Help
// --------------------------------------------------------------------------------------------
Expand All @@ -199,6 +219,7 @@ public static void printHelp() {
printHelpForInfo();
printHelpForList();
printHelpForCancel();
printHelpForSavepoint();

System.out.println();
}
Expand Down Expand Up @@ -255,6 +276,18 @@ public static void printHelpForCancel() {
System.out.println();
}

public static void printHelpForSavepoint() {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);

System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
System.out.println();
}

// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -292,6 +325,17 @@ public static CancelOptions parseCancelCommand(String[] args) throws CliArgsExce
}
}

public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException {
try {
PosixParser parser = new PosixParser();
CommandLine line = parser.parse(SAVEPOINT_OPTIONS, args, false);
return new SavepointOptions(line);
}
catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}

public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException {
try {
PosixParser parser = new PosixParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@

import org.apache.commons.cli.CommandLine;

import java.net.URL;
import java.net.MalformedURLException;
import java.util.List;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;

/**
* Base class for command line options that refer to a JAR file program.
Expand All @@ -52,6 +53,8 @@ public abstract class ProgramOptions extends CommandLineOptions {

private final boolean detachedMode;

private final String savepointPath;

protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);

Expand Down Expand Up @@ -105,6 +108,12 @@ else if (args.length > 0) {

stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt());

if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
} else {
savepointPath = null;
}
}

public String getJarFilePath() {
Expand Down Expand Up @@ -134,4 +143,8 @@ public boolean getStdoutLogging() {
public boolean getDetachedMode() {
return detachedMode;
}

public String getSavepointPath() {
return savepointPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;

import org.apache.commons.cli.CommandLine;

import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;

/**
* Command line options for the SAVEPOINT command
*/
public class SavepointOptions extends CommandLineOptions {

private final String[] args;
private boolean dispose;
private String disposeSavepointPath;

public SavepointOptions(CommandLine line) {
super(line);
this.args = line.getArgs();
this.dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
this.disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
}

public String[] getArgs() {
return args == null ? new String[0] : args;
}

public boolean isDispose() {
return dispose;
}

public String getDisposeSavepointPath() {
return disposeSavepointPath;
}
}
Loading

0 comments on commit 3607575

Please sign in to comment.