Skip to content

Commit

Permalink
[FLINK-3073] Replace Streaming Mode by Memory Allocation Mode
Browse files Browse the repository at this point in the history
Before, streaming mode (either batch or streaming) would specify how
memory is allocated on task managers.

This introduces a new configuration value taskmanager.memory.allocation
that can take values "lazy" or "eager". This controls how memory is
allocated.
  • Loading branch information
aljoscha committed Dec 9, 2015
1 parent 718a17b commit 4f12356
Show file tree
Hide file tree
Showing 72 changed files with 234 additions and 552 deletions.
23 changes: 9 additions & 14 deletions docs/quickstart/setup_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Download the ready to run binary package. Choose the Flink distribution that __m


## Start

1. Go to the download directory.
2. Unpack the downloaded archive.
3. Start Flink.
Expand All @@ -69,9 +69,6 @@ $ bin/start-local.sh # Start Flink
Check the __JobManager's web frontend__ at [http:https://localhost:8081](http:https://localhost:8081) and make
sure everything is up and running.

Instead of starting Flink with `bin/start-local.sh` you can also start Flink in an streaming optimized
mode, using `bin/start-local-streaming.sh`.

## Run Example

Run the __Word Count example__ to see Flink at work.
Expand All @@ -80,20 +77,20 @@ Run the __Word Count example__ to see Flink at work.

~~~bash
$ wget -O hamlet.txt http:https://www.gutenberg.org/cache/epub/1787/pg1787.txt
~~~
~~~

* You now have a text file called _hamlet.txt_ in your working directory.
* __Start the example program__:

~~~bash
$ bin/flink run ./examples/WordCount.jar file:https://`pwd`/hamlet.txt file:https://`pwd`/wordcount-result.txt
~~~

* You will find a file called __wordcount-result.txt__ in your current directory.


## Cluster Setup

__Running Flink on a cluster__ is as easy as running it locally. Having __passwordless SSH__ and
__the same directory structure__ on all your cluster nodes lets you use our scripts to control
everything.
Expand All @@ -106,9 +103,7 @@ on each node of your setup.
3. Add the IPs or hostnames (one per line) of all __worker nodes__ (TaskManager) to the slaves files
in `conf/slaves`.

You can now __start the cluster__ at your master node with `bin/start-cluster.sh`. If you are planning
to run only streaming jobs with Flink, you can also an optimized streaming mode: `start-cluster-streaming.sh`.

You can now __start the cluster__ at your master node with `bin/start-cluster.sh`.

The following __example__ illustrates the setup with three nodes (with IP addresses from _10.0.0.1_
to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the
Expand Down Expand Up @@ -139,9 +134,9 @@ configuration files, which need to be accessible at the same path on all machine
Have a look at the [Configuration]({{ site.baseurl }}/setup/config.html) section of the documentation to see other available configuration options.
For Flink to run efficiently, a few configuration values need to be set.

In particular,
In particular,

* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`taskmanager.tmp.dirs`)
Expand All @@ -150,7 +145,7 @@ In particular,
are very important configuration values.

## Flink on YARN
You can easily deploy Flink on your existing __YARN cluster__.
You can easily deploy Flink on your existing __YARN cluster__.

1. Download the __Flink Hadoop2 package__: [Flink with Hadoop 2]({{site.FLINK_DOWNLOAD_URL_HADOOP2_STABLE}})
2. Make sure your __HADOOP_HOME__ (or _YARN_CONF_DIR_ or _HADOOP_CONF_DIR_) __environment variable__ is set to read your YARN and HDFS configuration.
Expand Down
28 changes: 6 additions & 22 deletions docs/setup/cluster_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ For example, on Ubuntu Linux, type in the following commands to install Java and
ssh:

~~~bash
sudo apt-get install ssh
sudo apt-get install ssh
sudo apt-get install openjdk-7-jre
~~~

Expand Down Expand Up @@ -210,9 +210,9 @@ entire Flink directory to every worker node.
Please see the [configuration page](config.html) for details and additional
configuration options.

In particular,
In particular,

* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`taskmanager.tmp.dirs`)
Expand All @@ -236,34 +236,18 @@ bin/start-cluster.sh

To stop Flink, there is also a `stop-cluster.sh` script.


### Starting Flink in the streaming mode

~~~bash
bin/start-cluster-streaming.sh
~~~

The streaming mode changes the startup behavior of Flink: The system is not
bringing up the managed memory services with preallocated memory at the beginning.
Flink streaming is not using the managed memory employed by the batch operators.
By not starting these services with preallocated memory, streaming jobs can benefit
from more heap space being available.

Note that you can still start batch jobs in the streaming mode. The memory manager
will then allocate memory segments from the Java heap as needed.

### Optional: Adding JobManager/TaskManager instances to a cluster

You can add both TaskManager or JobManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.

#### Adding a TaskManager
<pre>
bin/taskmanager.sh (start [batch|streaming])|stop|stop-all)
bin/taskmanager.sh start|stop|stop-all
</pre>

#### Adding a JobManager
<pre>
bin/jobmanager.sh (start cluster [batch|streaming])|stop|stop-all)
bin/jobmanager.sh (start (local|cluster))|stop|stop-all
</pre>

Make sure to call these scripts on the hosts, on which you want to start/stop the respective instance.
Expand All @@ -281,7 +265,7 @@ setup step is not needed.
The following instructions are a general overview of usual required settings. Please consult one of the
many installation guides available online for more detailed instructions.

__Note that the following instructions are based on Hadoop 1.2 and might differ
__Note that the following instructions are based on Hadoop 1.2 and might differ
for Hadoop 2.__

### Downloading, Installing, and Configuring HDFS
Expand Down
39 changes: 21 additions & 18 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ with format `key: value`.
The system and run scripts parse the config at startup time. Changes to the configuration
file require restarting the Flink JobManager and TaskManagers.

The configuration files for the TaskManagers can be different, Flink does not assume
The configuration files for the TaskManagers can be different, Flink does not assume
uniform machines in the cluster.


Expand Down Expand Up @@ -66,9 +66,9 @@ contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and
user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager
(including sorting/hashing/caching), so this value should be as
large as possible. If the cluster is exclusively running Flink,
the total amount of available memory per machine minus some memory for the
the total amount of available memory per machine minus some memory for the
operating system (maybe 1-2 GB) is a good value.
On YARN setups, this value is automatically configured to the size of
On YARN setups, this value is automatically configured to the size of
the TaskManager's YARN container, minus a certain tolerance value.

- `taskmanager.numberOfTaskSlots`: The number of parallel operator or
Expand Down Expand Up @@ -142,9 +142,12 @@ results outside of the JVM heap. For setups with larger quantities of memory,
this can improve the efficiency of the operations performed on the memory
(DEFAULT: false).

- `taskmanager.memory.segment-size`: The size of memory buffers used by the
- `taskmanager.memory.segment-size`: The size of memory buffers used by the
memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task
managers should allocate all managed memory when starting up. (DEFAULT: false)


### Kerberos

Expand Down Expand Up @@ -203,18 +206,18 @@ available, increase this value (DEFAULT: 2048).

- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts
and Flink's YARN client.
This can be used to set different garbage collectors or to include remote debuggers into
This can be used to set different garbage collectors or to include remote debuggers into
the JVMs running Flink's services.

- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled.
Supported backends:
- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled.

Supported backends:

- `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
- `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ...

- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a flink supported filesystem
Note: State backend must be accessible from the JobManager, use file:https:// only for local setups.
Note: State backend must be accessible from the JobManager, use file:https:// only for local setups.

- `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.

Expand All @@ -235,7 +238,7 @@ specify that value on the execution environment. Default value is zero.
### HDFS

These parameters configure the default HDFS used by Flink. Setups that do not
specify a HDFS configuration have to specify the full path to
specify a HDFS configuration have to specify the full path to
HDFS files (`hdfs:https://address:port/path/to/files`) Files will also be written
with default HDFS parameters (block size, replication factor).

Expand Down Expand Up @@ -409,10 +412,10 @@ to set the JM host:port manually. It is recommended to leave this option at 1.

- `yarn.heartbeat-delay` (Default: 5 seconds). Time between heartbeats with the ResourceManager.

- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN,
the JobManager's host and the number of available processing slots is written into a properties file,
so that the Flink client is able to pick those details up. This configuration parameter allows
changing the default location of that file (for example for environments sharing a Flink
- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN,
the JobManager's host and the number of available processing slots is written into a properties file,
so that the Flink client is able to pick those details up. This configuration parameter allows
changing the default location of that file (for example for environments sharing a Flink
installation between users)

- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
Expand All @@ -436,7 +439,7 @@ In order to use the 'zookeeper' mode, it is mandatory to also define the `recove

- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected

- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.
- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.

- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.

Expand Down Expand Up @@ -512,12 +515,12 @@ Flink executes a program in parallel by splitting it into subtasks and schedulin

Each Flink TaskManager provides processing slots in the cluster. The number of slots
is typically proportional to the number of available CPU cores __of each__ TaskManager.
As a general recommendation, the number of available CPU cores is a good default for
As a general recommendation, the number of available CPU cores is a good default for
`taskmanager.numberOfTaskSlots`.

When starting a Flink application, users can supply the default number of slots to use for that job.
The command line value therefore is called `-p` (for parallelism). In addition, it is possible
to [set the number of slots in the programming APIs]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for
to [set the number of slots in the programming APIs]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for
the whole application and individual operators.

<img src="fig/slots_parallelism.svg" class="img-responsive" />
24 changes: 12 additions & 12 deletions docs/setup/jobmanager_high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ jobManagerAddressX:webUIPortX

In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:

- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.

<pre>recovery.mode: zookeeper</pre>

- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.

<pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>

Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.

- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all required coordination data is placed.

<pre>recovery.zookeeper.path.root: /flink # important: customize per cluster</pre>

**Important**: if you are running multiple Flink HA clusters, you have to manually configure seperate root nodes for each cluster.

- **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.

<pre>
Expand All @@ -83,13 +83,13 @@ state.backend.fs.checkpointdir: hdfs:https:///flink/checkpoints
recovery.zookeeper.storageDir: hdfs:https:///flink/recovery/</pre>

The `storageDir` stores all meta data needed to recover a JobManager failure.

After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a seperate ZooKeeper root path** for each HA cluster you are starting.

#### Example: Standalone Cluster with 2 JobManagers

1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:

<pre>
recovery.mode: zookeeper
recovery.zookeeper.quorum: localhost:2181
Expand All @@ -115,10 +115,10 @@ $ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.</pre>

5. **Start an HA-cluster**:

<pre>
$ bin/start-cluster-streaming.sh
Starting HA cluster (streaming mode) with 2 masters and 1 peers in ZooKeeper quorum.
$ bin/start-cluster.sh
Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
Starting jobmanager daemon on host localhost.
Starting jobmanager daemon on host localhost.
Starting taskmanager daemon on host localhost.</pre>
Expand Down Expand Up @@ -172,7 +172,7 @@ This means that the application can be restarted 10 times before YARN fails the
#### Example: Highly Available YARN Session

1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:

<pre>
recovery.mode: zookeeper
recovery.zookeeper.quorum: localhost:2181
Expand All @@ -193,7 +193,7 @@ $ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.</pre>

5. **Start an HA-cluster**:

<pre>
$ bin/yarn-session.sh -n 2</pre>

Expand Down
3 changes: 0 additions & 3 deletions docs/setup/local_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ INFO ... - Starting web info server for JobManager on port 8081

The JobManager will also start a web frontend on port 8081, which you can check with your browser at `http:https://localhost:8081`.

Instead of starting Flink with `bin/start-local.sh` you can also start Flink in an streaming optimized
mode, using `bin/start-local-streaming.sh`.

## Flink on Windows

If you want to run Flink on Windows you need to download, unpack and configure the Flink archive as mentioned above. After that you can either use the **Windows Batch** file (`.bat`) or use **Cygwin** to run the Flink Jobmanager.
Expand Down
Loading

0 comments on commit 4f12356

Please sign in to comment.