Skip to content

Commit

Permalink
[FLINK-16210] Extend the Flink Architecture section with more informa…
Browse files Browse the repository at this point in the history
…tion about Flink Master components and application execution
  • Loading branch information
morsapaes authored and aljoscha committed May 25, 2020
1 parent 702a339 commit d88c195
Showing 1 changed file with 164 additions and 29 deletions.
193 changes: 164 additions & 29 deletions docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,32 @@ specific language governing permissions and limitations
under the License.
-->

* This will be replaced by the TOC
{:toc}
Flink is a distributed system and requires effective allocation and management
of compute resources in order to execute streaming applications. It integrates
with all common cluster resource managers such as [Hadoop
YARN](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html),
[Apache Mesos](https://mesos.apache.org/) and
[Kubernetes](https://kubernetes.io/), but can also be set up to run as a
standalone cluster or even as a library.

## Flink Applications and Flink Sessions
This section contains an overview of Flink’s architecture and describes how its
main components interact to execute applications and recover from failures.

`TODO: expand this section`

{% top %}
* This will be replaced by the TOC
{:toc}

## Anatomy of a Flink Cluster

`TODO: expand this section, especially about components of the Flink Master and
container environments`

The Flink runtime consists of two types of processes:
The Flink runtime consists of two types of processes: a _Flink Master_ and one or more _Flink Workers_.

- The *Flink Master* coordinates the distributed execution. It schedules
tasks, coordinates checkpoints, coordinates recovery on failures, etc.
<img src="{{ site.baseurl }}/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="70%" />

There is always at least one *Flink Master*. A high-availability setup
might have multiple *Flink Masters*, one of which is always the
*leader*, and the others are *standby*.

- The *TaskManagers* (also called *workers*) execute the *tasks* (or more
specifically, the subtasks) of a dataflow, and buffer and exchange the data
*streams*.

There must always be at least one TaskManager.
The *Client* is not part of the runtime and program execution, but is used to
prepare and send a dataflow to the Flink Master. After that, the client can
disconnect (_detached mode_), or stay connected to receive progress reports
(_attached mode_). The client runs either as part of the Java/Scala program
that triggers the execution, or in the command line process `./bin/flink run
...`.

The Flink Master and TaskManagers can be started in various ways: directly on
the machines as a [standalone cluster]({% link
Expand All @@ -61,13 +59,50 @@ frameworks like [YARN]({% link ops/deployment/yarn_setup.md
TaskManagers connect to Flink Masters, announcing themselves as available, and
are assigned work.

The *client* is not part of the runtime and program execution, but is used to
prepare and send a dataflow to the Flink Master. After that, the client can
disconnect, or stay connected to receive progress reports. The client runs
either as part of the Java/Scala program that triggers the execution, or in the
command line process `./bin/flink run ...`.
### Flink Master

The _Flink Master_ has a number of responsibilities related to coordinating the distributed execution of Flink Applications:
it decides when to schedule the next task (or set of tasks), reacts to finished
tasks or execution failures, coordinates checkpoints, and coordinates recovery on
failures, among others. This process consists of three different components:

* **ResourceManager**

The _ResourceManager_ is responsible for resource de-/allocation and
provisioning in a Flink cluster — it manages **task slots**, which are the
unit of resource scheduling in a Flink cluster (see [Flink Workers](#flink-workers)).
Flink implements multiple ResourceManagers for different environments and
resource providers such as YARN, Mesos, Kubernetes and standalone
deployments. In a standalone setup, the ResourceManager can only distribute
the slots of available TaskManagers and cannot start new TaskManagers on
its own.

<img src="{{ site.baseurl }}/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
* **Dispatcher**

The _Dispatcher_ provides a REST interface to submit Flink applications for
execution and starts a new JobManager for each submitted job. It
also runs the Flink WebUI to provide information about job executions.

* **JobManager**

A _JobManager_ is responsible for managing the execution of a single
[JobGraph]({% link concepts/glossary.md %}#logical-graph).
Multiple jobs can run simultaneously in a Flink cluster, each having its
own JobManager.

There is always at least one Flink Master. A high-availability setup might have
multiple Flink Masters, one of which is always the *leader*, and the others are
*standby* (see [High Availability (HA)]({% link ops/jobmanager_high_availability.md %})).

### Flink Workers

The *TaskManagers* (also called *workers*) execute the tasks of a dataflow, and buffer and exchange the data
streams.

There must always be at least one TaskManager. The smallest unit of resource scheduling in a TaskManager is a task _slot_. The number of task slots in a
TaskManager indicates the number of concurrent processing tasks. Note that
multiple operators may execute in a task slot (see [Tasks and Operator
Chains](#tasks-and-operator-chains)).

{% top %}

Expand All @@ -78,8 +113,7 @@ For distributed execution, Flink *chains* operator subtasks together into
tasks is a useful optimization: it reduces the overhead of thread-to-thread
handover and buffering, and increases overall throughput while decreasing
latency. The chaining behavior can be configured; see the [chaining docs]({%
link dev/stream/operators/index.md %}#task-chaining-and-resource-groups) for
details.
link dev/stream/operators/index.md %}#task-chaining-and-resource-groups) for details.

The sample dataflow in the figure below is executed with five subtasks, and
hence with five parallel threads.
Expand Down Expand Up @@ -129,4 +163,105 @@ two main benefits:

<img src="{{ site.baseurl }}/fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />

## Flink Application Execution

A _Flink Application_ is any user program that spawns one or multiple Flink
jobs from its ``main()`` method. The execution of these jobs can happen in a
local JVM (``LocalEnvironment``) or on a remote setup of clusters with multiple
machines (``RemoteEnvironment``). For each program, the
[``ExecutionEnvironment``]({{ site.baseurl }}/api/java/) provides methods to
control the job execution (e.g. setting the parallelism) and to interact with
the outside world (see [Anatomy of a Flink Program]({%
link dev/datastream_api.md %}#anatomy-of-a-flink-program)).

The jobs of a Flink Application can either be submitted to a long-running
[Flink Session Cluster]({%
link concepts/glossary.md %}#flink-session-cluster), a dedicated [Flink Job
Cluster]({% link concepts/glossary.md %}#flink-job-cluster), or a
[Flink Application Cluster]({%
link concepts/glossary.md %}#flink-application-cluster). The difference between
these options is mainly related to the cluster’s lifecycle and to resource
isolation guarantees.

### Flink Session Cluster

* **Cluster Lifecycle**: in a Flink Session Cluster, the client connects to a
pre-existing, long-running cluster that can accept multiple job submissions.
Even after all jobs are finished, the cluster (and the Flink Master) will
keep running until the session is manually stopped. The lifetime of a Flink
Session Cluster is therefore not bound to the lifetime of any Flink Job.

* **Resource Isolation**: TaskManager slots are allocated by the
ResourceManager on job submission and released once the job is finished.
Because all jobs are sharing the same cluster, there is some competition for
cluster resources — like network bandwidth in the submit-job phase. One
limitation of this shared setup is that if one TaskManager crashes, then all
jobs that have tasks running on this worker will fail; in a similar way, if
some fatal error occurs on the Flink Master, it will affect all jobs running
in the cluster.

* **Other considerations**: having a pre-existing cluster saves a considerable
amount of time applying for resources and starting TaskManagers. This is
important in scenarios where the execution time of jobs is very short and a
high startup time would negatively impact the end-to-end user experience — as
is the case with interactive analysis of short queries, where it is desirable
that jobs can quickly perform computations using existing resources.

<div class="alert alert-info"> <strong>Note:</strong> Formerly, a Flink Session Cluster was also known as a Flink Cluster in <i>session mode</i>. </div>

### Flink Job Cluster

* **Cluster Lifecycle**: in a Flink Job Cluster, the available cluster manager
(like YARN or Kubernetes) is used to spin up a cluster for each submitted job
and this cluster is available to that job only. Here, the client first
requests resources from the cluster manager to start the Flink Master and
submits the job to the Dispatcher running inside this process. TaskManagers
are then lazily allocated based on the resource requirements of the job. Once
the job is finished, the Flink Job Cluster is torn down.

* **Resource Isolation**: a fatal error in the Flink Master only affects the one job running in that Flink Job Cluster.

* **Other considerations**: because the ResourceManager has to apply and wait
for external resource management components to start the TaskManager
processes and allocate resources, Flink Job Clusters are more suited to large
jobs that are long-running, have high-stability requirements and are not
sensitive to longer startup times.

<div class="alert alert-info"> <strong>Note:</strong> Formerly, a Flink Job Cluster was also known as a Flink Cluster in <i>job (or per-job) mode</i>. </div>

### Flink Application Cluster

* **Cluster Lifecycle**: a Flink Application Cluster is a dedicated Flink
cluster that only executes jobs from one Flink Application and where the
``main()`` method runs on the cluster rather than the client. The job
submission is a one-step process: you don’t need to start a Flink cluster
first and then submit a job to the existing cluster session; instead, you
package your application logic and dependencies into a executable job JAR and
the cluster entrypoint (``ApplicationClusterEntryPoint``)
is responsible for calling the ``main()`` method to extract the JobGraph.
This allows you to deploy a Flink Application like any other application on
Kubernetes, for example. The lifetime of a Flink Application Cluster is
therefore bound to the lifetime of the Flink Application.

* **Resource Isolation**: in a Flink Application Cluster, the ResourceManager
and Dispatcher are scoped to a single Flink Application, which provides a
better separation of concerns than the Flink Session Cluster.

<div class="alert alert-info"> <strong>Note:</strong> A Flink Job Cluster can be seen as a “run-on-client” alternative to Flink Application Clusters. </div>

{% top %}

## Self-contained Flink Applications

When you want to create and deploy something like an event-driven application, it doesn’t make
sense that you have to think about and manage a cluster. So, there are efforts
in the community towards fully enabling _Flink-as-a-Library_ in the future.

The idea is that deploying a Flink Application becomes as easy as starting a
process: Flink would be like any other library which you add to your application, with no effect on how you deploy it. When you want to deploy such an
application, it simply starts a set of processes which connect to each other,
figure out their roles (e.g. JobManager, TaskManager) and execute the
application in a distributed, parallel way. If the application cannot keep up
with the workload, Flink automatically starts new processes to rescale (i.e. auto-scaling).

{% top %}

0 comments on commit d88c195

Please sign in to comment.