Skip to content

Commit

Permalink
[FLINK-6513] [docs] cleaned up some typos and grammatical flaws
Browse files Browse the repository at this point in the history
This closes apache#3858
  • Loading branch information
alpinegizmo authored and greghogan committed May 10, 2017
1 parent 8a8d95e commit 71d7673
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 42 deletions.
34 changes: 16 additions & 18 deletions docs/dev/best_practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ This page contains a collection of best practices for Flink programmers on how t

## Parsing command line arguments and passing them around in your Flink application

Almost all Flink applications, both batch and streaming, rely on external configuration parameters.
They are used to specify input and output sources (like paths or addresses), system parameters (parallelism, runtime configuration), and application specific parameters (typically used within user functions).

Almost all Flink applications, both batch and streaming rely on external configuration parameters.
For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).

Since version 0.9 we are providing a simple utility called `ParameterTool` to provide at least some basic tooling for solving these problems.

Please note that you don't have to use the `ParameterTool` explained here. Other frameworks such as [Commons CLI](https://commons.apache.org/proper/commons-cli/),
[argparse4j](http:https://argparse4j.sourceforge.net/) and others work well with Flink as well.
Flink provides a simple utility called `ParameterTool` to provide some basic tooling for solving these problems.
Please note that you don't have to use the `ParameterTool` described here. Other frameworks such as [Commons CLI](https://commons.apache.org/proper/commons-cli/) and
[argparse4j](http:https://argparse4j.sourceforge.net/) also work well with Flink.


### Getting your configuration values into the `ParameterTool`
Expand Down Expand Up @@ -89,8 +87,8 @@ parameter.getNumberOfParameters()
// .. there are more methods available.
{% endhighlight %}

You can use the return values of these methods directly in the main() method (=the client submitting the application).
For example you could set the parallelism of a operator like this:
You can use the return values of these methods directly in the `main()` method of the client submitting the application.
For example, you could set the parallelism of a operator like this:

{% highlight java %}
ParameterTool parameters = ParameterTool.fromArgs(args);
Expand All @@ -105,10 +103,10 @@ ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
{% endhighlight %}

and then use them inside the function for getting values from the command line.
and then use it inside the function for getting values from the command line.


#### Passing it as a `Configuration` object to single functions
#### Passing parameters as a `Configuration` object to single functions

The example below shows how to pass the parameters as a `Configuration` object to a user defined function.

Expand All @@ -131,9 +129,9 @@ public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<S

#### Register the parameters globally

Parameters registered as a global job parameter at the `ExecutionConfig` allow you to access the configuration values from the JobManager web interface and all functions defined by the user.
Parameters registered as global job parameters in the `ExecutionConfig` can be accessed as configuration values from the JobManager web interface and in all functions defined by the user.

**Register the parameters globally**
Register the parameters globally:

{% highlight java %}
ParameterTool parameters = ParameterTool.fromArgs(args);
Expand Down Expand Up @@ -286,30 +284,30 @@ Change your projects `pom.xml` file like this:

The following changes were done in the `<dependencies>` section:

* Exclude all `log4j` dependencies from all Flink dependencies: This causes Maven to ignore Flink's transitive dependencies to log4j.
* Exclude the `slf4j-log4j12` artifact from Flink's dependencies: Since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
* Exclude all `log4j` dependencies from all Flink dependencies: this causes Maven to ignore Flink's transitive dependencies to log4j.
* Exclude the `slf4j-log4j12` artifact from Flink's dependencies: since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
* Add the Logback dependencies: `logback-core` and `logback-classic`
* Add dependencies for `log4j-over-slf4j`. `log4j-over-slf4j` is a tool which allows legacy applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger calls from Log4j to Slf4j which is in turn logging to Logback.

Please note that you need to manually add the exclusions to all new Flink dependencies you are adding to the pom file.

You may also need to check if other dependencies (non Flink) are pulling in log4j bindings. You can analyze the dependencies of your project with `mvn dependency:tree`.
You may also need to check if other (non-Flink) dependencies are pulling in log4j bindings. You can analyze the dependencies of your project with `mvn dependency:tree`.



### Use Logback when running Flink on a cluster

This tutorial is applicable when running Flink on YARN or as a standalone cluster.

In order to use Logback instead of Log4j with Flink, you need to remove the `log4j-1.2.xx.jar` and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.
In order to use Logback instead of Log4j with Flink, you need to remove `log4j-1.2.xx.jar` and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.

Next, you need to put the following jar files into the `lib/` folder:

* `logback-classic.jar`
* `logback-core.jar`
* `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j.

Note that you need to explicitly set the `lib/` directory when using a per job YARN cluster.
Note that you need to explicitly set the `lib/` directory when using a per-job YARN cluster.

The command to submit Flink on YARN with a custom logger is: `./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>`

Expand Down
11 changes: 6 additions & 5 deletions docs/dev/stream/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ any type of more elaborate operation.
In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions
in the streams to give the application the same semantics as a failure-free execution.

The [documentation on streaming fault tolerance](../../internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
The [documentation on streaming fault tolerance](../../internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism.


## Prerequisites
Expand Down Expand Up @@ -124,12 +124,13 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

## Selecting a State Backend

The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the [user-defined state](state.html) consistently to
provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured
Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) stores consistent snapshots
of all the state in timers and stateful operators, including connectors, windows, and any [user-defined state](state.html).
Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured
**State Backend**.

By default state will be kept in memory, and checkpoints will be stored in-memory at the master node (the JobManager). For proper persistence of large state,
Flink supports various forms of storing and checkpointing state in so called **State Backends**, which can be set via `StreamExecutionEnvironment.setStateBackend(…)`.
By default, state is kept in memory in the TaskManagers and checkpoints are stored in memory in the JobManager. For proper persistence of large state,
Flink supports various approaches for storing and checkpointing state in other state backends. The choice of state backend can be configured via `StreamExecutionEnvironment.setStateBackend(…)`.

See [state backends](../../ops/state_backends.html) for more details on the available state backends and options for job-wide and cluster-wide configuration.

Expand Down
4 changes: 2 additions & 2 deletions docs/dev/stream/process_function.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed s
`RuntimeContext`, similar to the way other stateful functions can access keyed state.

The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
Every call to the function `processElement(...)` gets a `Context` object which gives access to the element's
event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.
timers to manipulate keyed state.

<span class="label label-info">Note</span> If you want to access keyed state and timers you have
to apply the `ProcessFunction` on a keyed stream:
Expand Down
4 changes: 2 additions & 2 deletions docs/dev/stream/side_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ val outputTag = OutputTag[String]("side-output")
Notice how the `OutputTag` is typed according to the type of elements that the side output stream
contains.

Emitting data to a side output it only possible when using a
[ProcessFunction]({{ site.baseurl }}/dev/stream/process_function.html). In the function, you can use the `Context` parameter
Emitting data to a side output is only possible from within a
[ProcessFunction]({{ site.baseurl }}/dev/stream/process_function.html). You can use the `Context` parameter
to emit data to a side output identified by an `OutputTag`:

<div class="codetabs" markdown="1">
Expand Down
Loading

0 comments on commit 71d7673

Please sign in to comment.