From 150bccac338265953f5684210d47b01037341007 Mon Sep 17 00:00:00 2001 From: Konstantin Knauf Date: Wed, 10 Jul 2019 10:59:57 +0200 Subject: [PATCH] [FLINK-13123] [cli] rename "-s" parameter of stop command --- docs/ops/cli.md | 37 ++++++++++--------- docs/ops/cli.zh.md | 37 ++++++++++--------- .../flink/client/cli/CliFrontendParser.java | 12 +++--- .../apache/flink/client/cli/StopOptions.java | 6 +-- .../cli/CliFrontendStopWithSavepointTest.java | 10 ++--- 5 files changed, 52 insertions(+), 50 deletions(-) diff --git a/docs/ops/cli.md b/docs/ops/cli.md index 5eeed312f4870..8dabcbf85e732 100644 --- a/docs/ops/cli.md +++ b/docs/ops/cli.md @@ -176,23 +176,7 @@ These examples about how to manage a job in CLI. - Gracefully stop a job with a savepoint (streaming jobs only): - ./bin/flink stop -s [targetDirectory] -d - - -**NOTE**: The difference between cancelling and stopping a (streaming) job is the following: - -On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as -soon as possible. -If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically -until it stops. - -A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from -source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier -that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their -`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint -barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for -a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the - job to finish processing all in-flight data. + ./bin/flink stop [-p targetDirectory] [-d] ### Savepoints @@ -221,6 +205,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I Everything else is the same as described in the above **Trigger a Savepoint** section. +#### Stop + +Use the `stop` to gracefully stop a running streaming job with a savepoint. + +{% highlight bash %} +./bin/flink stop [-p targetDirectory] [-d] +{% endhighlight %} + +A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from +source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier +that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their +`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint +barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for +a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the + job to finish processing all in-flight data. + + #### Cancel with a savepoint (deprecated) You can atomically trigger a savepoint and cancel a job. @@ -431,7 +432,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only). "stop" action options: -d,--drain Send MAX_WATERMARK before taking the savepoint and stopping the pipelne. - -s,--withSavepoint Path to the savepoint (for example + -p,--savepointPath Path to the savepoint (for example hdfs:///flink/savepoint-1537). If no directory is specified, the configured default will be used diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md index ce6049f6d1a78..c4ecf9d0f911a 100644 --- a/docs/ops/cli.zh.md +++ b/docs/ops/cli.zh.md @@ -175,23 +175,7 @@ available. - Gracefully stop a job with a savepoint (streaming jobs only): - ./bin/flink stop -s [targetDirectory] -d - - -**NOTE**: The difference between cancelling and stopping a (streaming) job is the following: - -On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as -soon as possible. -If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically -until it stops. - -A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from -source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier -that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their -`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint -barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for -a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the - job to finish processing all in-flight data. + ./bin/flink stop [-p targetDirectory] [-d] ### Savepoints @@ -220,6 +204,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I Everything else is the same as described in the above **Trigger a Savepoint** section. +#### Stop + +Use the `stop` to gracefully stop a running streaming job with a savepoint. + +{% highlight bash %} +./bin/flink stop [-p targetDirectory] [-d] +{% endhighlight %} + +A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from +source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier +that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their +`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint +barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for +a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the + job to finish processing all in-flight data. + + #### Cancel with a savepoint (deprecated) You can atomically trigger a savepoint and cancel a job. @@ -426,7 +427,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only). "stop" action options: -d,--drain Send MAX_WATERMARK before taking the savepoint and stopping the pipelne. - -s,--withSavepoint Path to the savepoint (for example + -p,--savepointPath Path to the savepoint (for example hdfs:///flink/savepoint-1537). If no directory is specified, the configured default will be used diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 3219e40f02d0b..5639cb545cec0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -113,7 +113,7 @@ public class CliFrontendParser { "specified, the configured default directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used."); - public static final Option STOP_WITH_SAVEPOINT = new Option("s", "withSavepoint", true, + public static final Option STOP_WITH_SAVEPOINT_PATH = new Option("p", "savepointPath", true, "Path to the savepoint (for example hdfs:///flink/savepoint-1537). " + "If no directory is specified, the configured default will be used (\"" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "\")."); @@ -176,9 +176,9 @@ public class CliFrontendParser { CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); - STOP_WITH_SAVEPOINT.setRequired(false); - STOP_WITH_SAVEPOINT.setArgName("withSavepoint"); - STOP_WITH_SAVEPOINT.setOptionalArg(true); + STOP_WITH_SAVEPOINT_PATH.setRequired(false); + STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath"); + STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true); STOP_AND_DRAIN.setRequired(false); @@ -256,7 +256,7 @@ static Options getCancelCommandOptions() { static Options getStopCommandOptions() { return buildGeneralOptions(new Options()) - .addOption(STOP_WITH_SAVEPOINT) + .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN); } @@ -293,7 +293,7 @@ private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) private static Options getStopOptionsWithoutDeprecatedOptions(Options options) { return options - .addOption(STOP_WITH_SAVEPOINT) + .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java index 1ade31cff00a0..c5693b75e84b9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java @@ -21,7 +21,7 @@ import org.apache.commons.cli.CommandLine; import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN; -import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT; +import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH; /** * Command line options for the STOP command. @@ -41,8 +41,8 @@ class StopOptions extends CommandLineOptions { super(line); this.args = line.getArgs(); - this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT.getOpt()); - this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT.getOpt()); + this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT_PATH.getOpt()); + this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT_PATH.getOpt()); this.advanceToEndOfEventTime = line.hasOption(STOP_AND_DRAIN.getOpt()); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java index 2c829ccea1c31..19872aaf88ed7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java @@ -80,7 +80,7 @@ public void testStopWithOnlyJobId() throws Exception { public void testStopWithDefaultSavepointDir() throws Exception { JobID jid = new JobID(); - String[] parameters = { "-s", jid.toString() }; + String[] parameters = {jid.toString() }; final ClusterClient clusterClient = createClusterClient(null); MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient); testFrontend.stop(parameters); @@ -93,7 +93,7 @@ public void testStopWithDefaultSavepointDir() throws Exception { public void testStopWithExplicitSavepointDir() throws Exception { JobID jid = new JobID(); - String[] parameters = { "-s", "test-target-dir", jid.toString() }; + String[] parameters = { "-p", "test-target-dir", jid.toString() }; final ClusterClient clusterClient = createClusterClient(null); MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient); testFrontend.stop(parameters); @@ -119,7 +119,7 @@ public void testStopOnlyWithMaxWM() throws Exception { public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception { JobID jid = new JobID(); - String[] parameters = { "-s", "-d", jid.toString() }; + String[] parameters = { "-p", "-d", jid.toString() }; final ClusterClient clusterClient = createClusterClient(null); MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient); testFrontend.stop(parameters); @@ -132,7 +132,7 @@ public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception { public void testStopWithMaxWMAndExplicitSavepointDir() throws Exception { JobID jid = new JobID(); - String[] parameters = { "-d", "-s", "test-target-dir", jid.toString() }; + String[] parameters = { "-d", "-p", "test-target-dir", jid.toString() }; final ClusterClient clusterClient = createClusterClient(null); MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient); testFrontend.stop(parameters); @@ -181,7 +181,7 @@ public void testUnknownJobId() throws Exception { // test unknown job Id JobID jid = new JobID(); - String[] parameters = { "-s", "test-target-dir", jid.toString() }; + String[] parameters = { "-p", "test-target-dir", jid.toString() }; String expectedMessage = "Test exception"; FlinkException testException = new FlinkException(expectedMessage); final ClusterClient clusterClient = createClusterClient(testException);