Skip to content

Commit

Permalink
[FLINK-12388][docs] Update the production readiness checklist
Browse files Browse the repository at this point in the history
This closes apache#8330
  • Loading branch information
sjwiesman authored and StephanEwen committed May 10, 2019
1 parent d14fc9e commit 754cd71
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 112 deletions.
87 changes: 31 additions & 56 deletions docs/ops/production_ready.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,79 +22,54 @@ specific language governing permissions and limitations
under the License.
-->

The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production.
While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs.

* ToC
{:toc}

## Production Readiness Checklist

Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are
important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options
Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those
defaults are good starting points for development and completely sufficient for "one-shot" jobs.

However, once you are planning to bring a Flink application to production the requirements typically increase. For example,
you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions.

In the following, we present a collection of configuration options that you should check before your job goes into production.

### Set maximum parallelism for operators explicitly

Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications
for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity,
determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there
is **no way to change** this parameter after your job has been started, except for restarting your job completely
from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way
to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is
likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high
value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your
application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which
can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your
future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular,
a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends.
### Set An Explicit Max Parallelism

Notice that maximum parallelism must fulfill the following conditions:
The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale.
There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state.
The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size.
Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism.
In general, you should choose max parallelism that is high enough to fit your future needs in scalability, while keeping it low enough to maintain reasonable performance.

`0 < parallelism <= max parallelism <= 2^15`
{% panel **Note:** Maximum parallelism must fulfill the following conditions: `0 < parallelism <= max parallelism <= 2^15` %}

You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum
parallelism as a function of the parallelism when the job is first started:
You can explicitly set maximum parallelism by using `setMaxParallelism(int maxparallelism)`.
If no max parallelism is set Flink will decide using a function of the operators parallelism when the job is first started:

- `128` : for all parallelism <= 128.
- `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism > 128.

### Set UUIDs for operators
### Set UUIDs For All Operators

As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for
operators. Those operator uids are important for Flink's mapping of operator states to operators which, in turn, is
essential for savepoints. By default operator uids are generated by traversing the JobGraph and hashing certain operator
properties. While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g.
exchanging an operator) will result in new UUIDs. To establish a stable mapping, we need stable operator uids provided
by the user through `setUid(String uid)`.
As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for each operator in their `DataStream`.
Uids are necessary for Flink's mapping of operator states to operators which, in turn, is essential for savepoints.
By default, operator uids are generated by traversing the JobGraph and hashing specific operator properties.
While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g., exchanging an operator) results in new UUIDs.
To establish a stable mapping, we need stable operator uids provided by the user through `setUid(String uid)`.

### Choice of state backend
### Choose The Right State Backend

Currently, Flink has the limitation that it can only restore the state from a savepoint for the same state backend that
took the savepoint. For example, this means that we can not take a savepoint with a memory state backend, then change
the job to use a RocksDB state backend and restore. While we are planning to make backends interoperable in the near
future, they are not yet. This means you should carefully consider which backend you use for your job before going to
production.
Currently, Flink's savepoint binary format is state backend specific.
A savepoint taken with one state backend cannot be restored using another, and you should carefully consider which backend you use before going to production.

In general, we recommend using RocksDB because this is currently the only state backend that supports large states (i.e.
state that exceeds the available main memory) and asynchronous snapshots. From our experience, asynchronous snapshots are
very important for large states because they do not block the operators and Flink can write the snapshots without stopping
stream processing. However, RocksDB can have worse performance than, for example, the memory-based state backends. If
you are sure that your state will never exceed main memory and blocking the stream processing to write it is not an issue,
you **could consider** to not use the RocksDB backends. However, at this point, we **strongly recommend** using RocksDB
for production.
In general, we recommend avoiding `MemoryStateBackend` in production because it stores its snapshots inside the JobManager as opposed to persistent disk.
When deciding between `FsStateBackend` and `RocksDB`, it is a choice between performance and scalability.
`FsStateBackend` is very fast as each state access and update operates on objects on the Java heap; however, state size is limited by available memory within the cluster.
On the other hand, `RocksDB` can scale based on available disk space and is the only state backend to support incremental snapshots.
However, each state access and update requires (de-)serialization and potentially reading from disk which leads to average performance that is an order of magnitude slower than the memory state backends.
Carefully read through the [state backend documentation]({{ site.baseurl }}/ops/state/state_backends.html) to fully understand the pros and cons of each option.

### Config JobManager High Availability(HA)
### Configure JobManager High Availability

The JobManager coordinates every Flink deployment. It is responsible for both *scheduling* and *resource management*.
The JobManager serves as a central coordinator for each Flink deployment, being responsible for both scheduling and resource management of the cluster.
It is a single point of failure within the cluster, and if it crashes, no new jobs can be submitted, and running applications will fail.

By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF):
if the JobManager crashes, no new programs can be submitted and running programs fail.
Configuring [High Availability]({{ site.baseurl }}/ops/jobmanager_high_availability.html), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*.
We **strongly recommend** you configure [high availability]({{ site.baseurl }}/ops/jobmanager_high_availability.html) for production.

{% top %}
88 changes: 32 additions & 56 deletions docs/ops/production_ready.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,79 +22,55 @@ specific language governing permissions and limitations
under the License.
-->

The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production.
While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs.

* ToC
{:toc}

## Production Readiness Checklist

Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are
important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options
Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those
defaults are good starting points for development and completely sufficient for "one-shot" jobs.

However, once you are planning to bring a Flink application to production the requirements typically increase. For example,
you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions.

In the following, we present a collection of configuration options that you should check before your job goes into production.
### Set An Explicit Max Parallelism

### Set maximum parallelism for operators explicitly
The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale.
There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state.
The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size.
Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism.
In general, you should choose max parallelism that is high enough to fit your future needs in scalability, while keeping it low enough to maintain reasonable performance.

Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications
for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity,
determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there
is **no way to change** this parameter after your job has been started, except for restarting your job completely
from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way
to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is
likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high
value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your
application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which
can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your
future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular,
a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends.
{% panel **Note:** Maximum parallelism must fulfill the following conditions: `0 < parallelism <= max parallelism <= 2^15` %}

Notice that maximum parallelism must fulfill the following conditions:

`0 < parallelism <= max parallelism <= 2^15`

You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum
parallelism as a function of the parallelism when the job is first started:
You can explicitly set maximum parallelism by using `setMaxParallelism(int maxparallelism)`.
If no max parallelism is set Flink will decide using a function of the operators parallelism when the job is first started:

- `128` : for all parallelism <= 128.
- `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism > 128.

### Set UUIDs for operators
### Set UUIDs For All Operators

As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for
operators. Those operator uids are important for Flink's mapping of operator states to operators which, in turn, is
essential for savepoints. By default operator uids are generated by traversing the JobGraph and hashing certain operator
properties. While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g.
exchanging an operator) will result in new UUIDs. To establish a stable mapping, we need stable operator uids provided
by the user through `setUid(String uid)`.
As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for each operator in their `DataStream`.
Uids are necessary for Flink's mapping of operator states to operators which, in turn, is essential for savepoints.
By default, operator uids are generated by traversing the JobGraph and hashing specific operator properties.
While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g., exchanging an operator) results in new UUIDs.
To establish a stable mapping, we need stable operator uids provided by the user through `setUid(String uid)`.

### Choice of state backend
### Choose The Right State Backend

Currently, Flink has the limitation that it can only restore the state from a savepoint for the same state backend that
took the savepoint. For example, this means that we can not take a savepoint with a memory state backend, then change
the job to use a RocksDB state backend and restore. While we are planning to make backends interoperable in the near
future, they are not yet. This means you should carefully consider which backend you use for your job before going to
production.
Currently, Flink's savepoint binary format is state backend specific.
A savepoint taken with one state backend cannot be restored using another, and you should carefully consider which backend you use before going to production.

In general, we recommend using RocksDB because this is currently the only state backend that supports large states (i.e.
state that exceeds the available main memory) and asynchronous snapshots. From our experience, asynchronous snapshots are
very important for large states because they do not block the operators and Flink can write the snapshots without stopping
stream processing. However, RocksDB can have worse performance than, for example, the memory-based state backends. If
you are sure that your state will never exceed main memory and blocking the stream processing to write it is not an issue,
you **could consider** to not use the RocksDB backends. However, at this point, we **strongly recommend** using RocksDB
for production.
In general, we recommend avoiding `MemoryStateBackend` in production because it stores its snapshots inside the JobManager as opposed to persistent disk.
When deciding between `FsStateBackend` and `RocksDB`, it is a choice between performance and scalability.
`FsStateBackend` is very fast as each state access and update operates on objects on the Java heap; however, state size is limited by available memory within the cluster.
On the other hand, `RocksDB` can scale based on available disk space and is the only state backend to support incremental snapshots.
However, each state access and update requires (de-)serialization and potentially reading from disk which leads to average performance that is an order of magnitude slower than the memory state backends.
Carefully read through the [state backend documentation]({{ site.baseurl }}/ops/state/state_backends.html) to fully understand the pros and cons of each option.

### Config JobManager High Availability(HA)
### Configure JobManager High Availability

The JobManager coordinates every Flink deployment. It is responsible for both *scheduling* and *resource management*.
The JobManager serves as a central coordinator for each Flink deployment, being responsible for both scheduling and resource management of the cluster.
It is a single point of failure within the cluster, and if it crashes, no new jobs can be submitted, and running applications will fail.

By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF):
if the JobManager crashes, no new programs can be submitted and running programs fail.
Configuring [High Availability]({{ site.baseurl }}/ops/jobmanager_high_availability.html), in conjunction with Apache Zookeeper, allows for a swift recovery and is highly recommended for production setups.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*.
We **strongly recommend** you configure [high availability]({{ site.baseurl }}/ops/jobmanager_high_availability.html) for production.

{% top %}

0 comments on commit 754cd71

Please sign in to comment.