Skip to content

Commit

Permalink
[FLINK-24899][network] Enable data compression for blocking shuffle b…
Browse files Browse the repository at this point in the history
…y default

This closes apache#17814.
  • Loading branch information
SteNicholas authored and wsry committed Jan 18, 2022
1 parent f2d03b0 commit a14c554
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/ops/batch/blocking_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile``mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582)[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)

当使用sort blocking shuffle的时候有些配置需要适配:
- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。
- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): 配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/ops/batch/blocking_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ To further improve the performance, for most jobs we also recommend [enabling co
`Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read data sequentially, sort shuffle can achieve better performance than hash shuffle, especially on HDD. Additionally, `sort shuffle` uses extra managed memory as data reading buffer and does not rely on `sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref "docs/deployment/security/security-ssl" >}}). Please refer to [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and [FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614) for more details about sort shuffle.

There are several config options that might need adjustment when using sort blocking shuffle:
- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low.
- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low. Defaults to false for 1.14 and lower, and true for 1.15 and higher.
- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used.
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<tbody>
<tr>
<td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
</tr>
Expand Down Expand Up @@ -144,7 +144,7 @@
<td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
<td style="word-wrap: break-word;">2147483647</td>
<td>Integer</td>
<td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to enable data compression by setting 'taskmanager.network.blocking-shuffle.compression.enabled' to true and tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
<td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</tr>
<tr>
<td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
</tr>
Expand Down Expand Up @@ -132,7 +132,7 @@
<td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
<td style="word-wrap: break-word;">2147483647</td>
<td>Integer</td>
<td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to enable data compression by setting 'taskmanager.network.blocking-shuffle.compression.enabled' to true and tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
<td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public class NettyShuffleEnvironmentOptions {
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Boolean> BLOCKING_SHUFFLE_COMPRESSION_ENABLED =
key("taskmanager.network.blocking-shuffle.compression.enabled")
.defaultValue(false)
.booleanType()
.defaultValue(true)
.withDescription(
"Boolean flag indicating whether the shuffle data will be compressed "
+ "for blocking shuffle mode. Note that data is compressed per "
Expand Down Expand Up @@ -232,9 +233,7 @@ public class NettyShuffleEnvironmentOptions {
+ " batch jobs of large parallelism, the sort-merge one"
+ " will be used. Note: For production usage, if sort-"
+ "merge blocking shuffle is enabled, you may also need"
+ " to enable data compression by setting '%s' to true "
+ "and tune '%s' and '%s' for better performance.",
BLOCKING_SHUFFLE_COMPRESSION_ENABLED.key(),
+ " to tune '%s' and '%s' for better performance.",
NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(),
// raw string key is used here to avoid interdependence, a test
// is implemented to guard that when the target key is modified,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ public void testSortMergeShuffleConfigOptionsCorrelation() {
String description = formatter.format(configOption.description());

String configKey =
getConfigKey(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED);
assertTrue(description.contains(configKey));
configKey = getConfigKey(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
getConfigKey(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
assertTrue(description.contains(configKey));
configKey = getConfigKey(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY);
assertTrue(description.contains(configKey));

assertTrue(
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED.defaultValue());
}

private static String getConfigKey(ConfigOption<?> configOption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ public static Boolean[] params() {
}

@Test
public void testDataCompressionForBoundedBlockingShuffle() throws Exception {
public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception {
Configuration configuration = new Configuration();
configuration.setBoolean(
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));

JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS);
}

@Test
public void testDataCompressionForSortMergeBlockingShuffle() throws Exception {
public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception {
Configuration configuration = new Configuration();
configuration.setBoolean(
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
Expand Down

0 comments on commit a14c554

Please sign in to comment.