Skip to content

Commit

Permalink
[FLINK-4545] [network] replace the network buffers parameter
Browse files Browse the repository at this point in the history
Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

This closes apache#3721
  • Loading branch information
Nico Kruber authored and StephanEwen committed May 6, 2017
1 parent ac72450 commit 0bb49e5
Show file tree
Hide file tree
Showing 26 changed files with 1,477 additions and 103 deletions.
80 changes: 65 additions & 15 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to

### Managed Memory

By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
By default, Flink allocates a fraction of `0.7` of the free memory (total memory configured via `taskmanager.heap.mb` minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on `taskmanager.memory.off-heap`) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by `taskmanager.memory.fraction`. (DEFAULT: -1)

- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.

- `taskmanager.memory.off-heap`: If set to `true`, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).

Expand Down Expand Up @@ -174,7 +174,11 @@ will be used under the directory specified by jobmanager.web.tmpdir.

- `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)

- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048).
- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)

- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:
- `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
Expand Down Expand Up @@ -257,11 +261,17 @@ The following parameters configure Flink's JobManager and TaskManagers.

- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: **The system's tmp dir**).

- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**).
- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that `taskmanager.network.memory.min` and `taskmanager.network.memory.max` may override this fraction. (DEFAULT: **0.1**)

- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: **64 MB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.

- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: **1 GB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.

- `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`.

- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: **-1**)

- `taskmanager.memory.fraction`: The relative amount of memory that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that TaskManagers reserve 80% of the JVM's heap space for internal data buffers, leaving 20% of the JVM's heap space free for objects created by user-defined functions. (DEFAULT: **0.7**) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.

- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.

Expand Down Expand Up @@ -614,26 +624,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated

## Background


### Configuring the Network Buffers

If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
need to adapt the amount of memory used for network buffers in order for your program to run on your
task managers.

Network buffers are a critical resource for the communication layers. They are used to buffer
records before transmission over a network, and to buffer incoming data before dissecting it into
records and handing them to the application. A sufficient number of network buffers is critical to
achieve a good throughput.

<div class="alert alert-info">
Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we
prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting
the actual number of buffers used by each channel).
</div>

In general, configure the task manager to have enough buffers that each logical network connection
you expect to be open at the same time has a dedicated buffer. A logical network connection exists
for each point-to-point exchange of data over the network, which typically happens at
repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the
TaskManager has to be able to talk to all other parallel tasks.

#### Setting Memory Fractions

Previously, the number of network buffers was set manually which became a quite error-prone task
(see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for
network buffers with the following configuration parameters:

- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),
- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and
- `taskmanager.memory.segment-size`: Size of memory buffers used by the memory manager and the
network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

#### Setting the Number of Network Buffers directly

<div class="alert alert-warning">
<strong>Note:</strong> This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.
</div>

The required number of buffers on a task manager is
*total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*
with *n* being a constant that defines how many repartitioning-/broadcasting steps you expect to be
active at the same time. Since the *intra-node-parallelism* is typically the number of cores, and
more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently
boils down to

```
#slots-per-TM^2 * #TMs * 4
```

Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the total number of task managers.

Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the
application. A sufficient number of network buffers is critical to achieve a good throughput.

In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning- or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time.

Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to `#slots-per-TM^2 * #TMs * 4`.

To support for example a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.
To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network
buffers for optimal throughput.

Each network buffer has by default a size of 32 KiBytes. In the above example, the system would allocate roughly 300 MiBytes for network buffers.
Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus
allocate roughly 300 MiBytes for network buffers.

The number and size of network buffers can be configured with the following parameters:

Expand Down
2 changes: 1 addition & 1 deletion docs/setup/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ The system will use the configuration in `conf/flink-conf.yaml`. Please follow o

Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.

If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.

The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -36,6 +40,7 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -74,6 +79,12 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
public static void activateHBaseCluster() throws IOException {
registerHBaseMiniClusterInClasspath();
prepareTable();
LimitNetworkBuffersTestEnvironment.setAsContext();
}

@AfterClass
public static void resetExecutionEnvironmentFactory() {
LimitNetworkBuffersTestEnvironment.unsetAsContext();
}

private static void prepareTable() throws IOException {
Expand Down Expand Up @@ -335,4 +346,30 @@ public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exc
}


/**
* Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
* configuration that limits the maximum memory used for network buffers since the current
* defaults are too high for Travis-CI.
*/
private static abstract class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {

public static void setAsContext() {
Configuration config = new Configuration();
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
final LocalEnvironment le = new LocalEnvironment(config);

initializeContextEnvironment(new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
return le;
}
});
}

public static void unsetAsContext() {
resetContextEnvironment();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,30 @@ public class TaskManagerOptions {
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);

/** Fraction of JVM memory to use for network buffers. */
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f);

/** Minimum memory size for network buffers (in bytes) */
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue(64L << 20); // 64 MB

/** Maximum memory size for network buffers (in bytes) */
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue(1024L << 20); // 1 GB


/** Minimum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
Expand All @@ -101,6 +120,7 @@ public class TaskManagerOptions {
key("taskmanager.net.request-backoff.max")
.defaultValue(10000);


/**
* Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
*
Expand Down
Loading

0 comments on commit 0bb49e5

Please sign in to comment.