Skip to content

Commit

Permalink
[FLINK-13123] [cli] rename "-s" parameter of stop command
Browse files Browse the repository at this point in the history
  • Loading branch information
knaufk authored and kl0u committed Jul 11, 2019
1 parent a796fff commit 150bcca
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 50 deletions.
37 changes: 19 additions & 18 deletions docs/ops/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,7 @@ These examples about how to manage a job in CLI.

- Gracefully stop a job with a savepoint (streaming jobs only):

./bin/flink stop -s [targetDirectory] -d <jobID>

**NOTE**: The difference between cancelling and stopping a (streaming) job is the following:

On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
soon as possible.
If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
until it stops.

A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
job to finish processing all in-flight data.
./bin/flink stop [-p targetDirectory] [-d] <jobID>

### Savepoints

Expand Down Expand Up @@ -221,6 +205,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I

Everything else is the same as described in the above **Trigger a Savepoint** section.

#### Stop

Use the `stop` to gracefully stop a running streaming job with a savepoint.

{% highlight bash %}
./bin/flink stop [-p targetDirectory] [-d] <jobID>
{% endhighlight %}

A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
job to finish processing all in-flight data.


#### Cancel with a savepoint (deprecated)

You can atomically trigger a savepoint and cancel a job.
Expand Down Expand Up @@ -431,7 +432,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only).
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
-s,--withSavepoint <withSavepoint> Path to the savepoint (for example
-p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:https:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
Expand Down
37 changes: 19 additions & 18 deletions docs/ops/cli.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,7 @@ available.

- Gracefully stop a job with a savepoint (streaming jobs only):

./bin/flink stop -s [targetDirectory] -d <jobID>


**NOTE**: The difference between cancelling and stopping a (streaming) job is the following:

On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
soon as possible.
If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
until it stops.

A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
job to finish processing all in-flight data.
./bin/flink stop [-p targetDirectory] [-d] <jobID>

### Savepoints

Expand Down Expand Up @@ -220,6 +204,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I

Everything else is the same as described in the above **Trigger a Savepoint** section.

#### Stop

Use the `stop` to gracefully stop a running streaming job with a savepoint.

{% highlight bash %}
./bin/flink stop [-p targetDirectory] [-d] <jobID>
{% endhighlight %}

A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
job to finish processing all in-flight data.


#### Cancel with a savepoint (deprecated)

You can atomically trigger a savepoint and cancel a job.
Expand Down Expand Up @@ -426,7 +427,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only).
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
-s,--withSavepoint <withSavepoint> Path to the savepoint (for example
-p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:https:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class CliFrontendParser {
"specified, the configured default directory (" +
CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");

public static final Option STOP_WITH_SAVEPOINT = new Option("s", "withSavepoint", true,
public static final Option STOP_WITH_SAVEPOINT_PATH = new Option("p", "savepointPath", true,
"Path to the savepoint (for example hdfs:https:///flink/savepoint-1537). " +
"If no directory is specified, the configured default will be used (\"" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "\").");

Expand Down Expand Up @@ -176,9 +176,9 @@ public class CliFrontendParser {
CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);

STOP_WITH_SAVEPOINT.setRequired(false);
STOP_WITH_SAVEPOINT.setArgName("withSavepoint");
STOP_WITH_SAVEPOINT.setOptionalArg(true);
STOP_WITH_SAVEPOINT_PATH.setRequired(false);
STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");
STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);

STOP_AND_DRAIN.setRequired(false);

Expand Down Expand Up @@ -256,7 +256,7 @@ static Options getCancelCommandOptions() {

static Options getStopCommandOptions() {
return buildGeneralOptions(new Options())
.addOption(STOP_WITH_SAVEPOINT)
.addOption(STOP_WITH_SAVEPOINT_PATH)
.addOption(STOP_AND_DRAIN);
}

Expand Down Expand Up @@ -293,7 +293,7 @@ private static Options getCancelOptionsWithoutDeprecatedOptions(Options options)

private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
return options
.addOption(STOP_WITH_SAVEPOINT)
.addOption(STOP_WITH_SAVEPOINT_PATH)
.addOption(STOP_AND_DRAIN);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.commons.cli.CommandLine;

import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN;
import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT;
import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH;

/**
* Command line options for the STOP command.
Expand All @@ -41,8 +41,8 @@ class StopOptions extends CommandLineOptions {
super(line);
this.args = line.getArgs();

this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT.getOpt());
this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT.getOpt());
this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT_PATH.getOpt());
this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT_PATH.getOpt());

this.advanceToEndOfEventTime = line.hasOption(STOP_AND_DRAIN.getOpt());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testStopWithOnlyJobId() throws Exception {
public void testStopWithDefaultSavepointDir() throws Exception {
JobID jid = new JobID();

String[] parameters = { "-s", jid.toString() };
String[] parameters = {jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Expand All @@ -93,7 +93,7 @@ public void testStopWithDefaultSavepointDir() throws Exception {
public void testStopWithExplicitSavepointDir() throws Exception {
JobID jid = new JobID();

String[] parameters = { "-s", "test-target-dir", jid.toString() };
String[] parameters = { "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Expand All @@ -119,7 +119,7 @@ public void testStopOnlyWithMaxWM() throws Exception {
public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception {
JobID jid = new JobID();

String[] parameters = { "-s", "-d", jid.toString() };
String[] parameters = { "-p", "-d", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Expand All @@ -132,7 +132,7 @@ public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception {
public void testStopWithMaxWMAndExplicitSavepointDir() throws Exception {
JobID jid = new JobID();

String[] parameters = { "-d", "-s", "test-target-dir", jid.toString() };
String[] parameters = { "-d", "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testUnknownJobId() throws Exception {
// test unknown job Id
JobID jid = new JobID();

String[] parameters = { "-s", "test-target-dir", jid.toString() };
String[] parameters = { "-p", "test-target-dir", jid.toString() };
String expectedMessage = "Test exception";
FlinkException testException = new FlinkException(expectedMessage);
final ClusterClient<String> clusterClient = createClusterClient(testException);
Expand Down

0 comments on commit 150bcca

Please sign in to comment.