Skip to content

Commit

Permalink
[FLINK-23402][streaming-java] Simplify shuffle mode for batch execution
Browse files Browse the repository at this point in the history
This closes apache#16679.
  • Loading branch information
twalthr committed Aug 4, 2021
1 parent 48526e4 commit de7a00b
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 141 deletions.
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/execution_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>execution.batch-shuffle-mode</h5></td>
<td style="word-wrap: break-word;">ALL_EXCHANGES_BLOCKING</td>
<td><p>Enum</p></td>
<td>Defines how data is exchanged between tasks in batch 'execution.runtime-mode' if the shuffling behavior has not been set explicitly for an individual exchange.<br />With pipelined exchanges, upstream and downstream tasks run simultaneously. In order to achieve lower latency, a result record is immediately sent to and processed by the downstream task. Thus, the receiver back-pressures the sender. The streaming mode always uses this exchange.<br />With blocking exchanges, upstream and downstream tasks run in stages. Records are persisted to some storage between stages. Downstream tasks then fetch these records after the upstream tasks finished. Such an exchange reduces the resources required to execute the job as it does not need to run upstream and downstream tasks simultaneously.<br /><br />Possible values:<ul><li>"ALL_EXCHANGES_PIPELINED": Upstream and downstream tasks run simultaneously. This leads to lower latency and more evenly distributed (but higher) resource usage across tasks.</li><li>"ALL_EXCHANGES_BLOCKING": Upstream and downstream tasks run subsequently. This reduces the resource usage as downstream tasks are started after upstream tasks finished.</li></ul></td>
</tr>
<tr>
<td><h5>execution.buffer-timeout</h5></td>
<td style="word-wrap: break-word;">100 ms</td>
Expand All @@ -26,11 +32,5 @@
<td><p>Enum</p></td>
<td>Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics.<br /><br />Possible values:<ul><li>"STREAMING"</li><li>"BATCH"</li><li>"AUTOMATIC"</li></ul></td>
</tr>
<tr>
<td><h5>execution.shuffle-mode</h5></td>
<td style="word-wrap: break-word;">AUTOMATIC</td>
<td><p>Enum</p></td>
<td>Mode that defines how data is exchanged between tasks if the shuffling behavior has not been set explicitly for an individual exchange. The shuffle mode depends on the configured 'execution.runtime-mode' and is only relevant for batch executions on bounded streams.<br />In streaming mode, upstream and downstream tasks run simultaneously to achieve low latency. An exchange is always pipelined (i.e. a result record is immediately sent to and processed by the downstream task). Thus, the receiver back-pressures the sender.<br />In batch mode, upstream and downstream tasks can run in stages. Blocking exchanges persist records to some storage. Downstream tasks then fetch these records after the upstream tasks finished. Such an exchange reduces the resources required to execute the job as it does not need to run upstream and downstream tasks simultaneously.<br /><br />Possible values:<ul><li>"ALL_EXCHANGES_PIPELINED": Upstream and downstream tasks run simultaneously. This leads to lower latency and more evenly distributed (but higher) resource usage across tasks in batch mode. This is the only supported shuffle behavior in streaming mode.</li><li>"ALL_EXCHANGES_BLOCKING": Upstream and downstream tasks run subsequently. This reduces the resource usage in batch mode as downstream tasks are started after upstream tasks finished. This shuffle behavior is not supported in streaming mode.</li><li>"AUTOMATIC": The framework chooses an appropriate shuffle behavior based on the runtime mode and slot assignment.</li></ul></td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,46 @@
import static org.apache.flink.configuration.description.TextElement.text;

/**
* Mode that defines how data is exchanged between tasks if the shuffling behavior has not been set
* explicitly for an individual exchange.
* Defines how data is exchanged between tasks in batch {@link ExecutionOptions#RUNTIME_MODE} if the
* shuffling behavior has not been set explicitly for an individual exchange.
*
* <p>The shuffle mode depends on the configured {@link ExecutionOptions#RUNTIME_MODE} and is only
* relevant for batch executions on bounded streams.
* <p>With pipelined exchanges, upstream and downstream tasks run simultaneously. In order to
* achieve lower latency, a result record is immediately sent to and processed by the downstream
* task. Thus, the receiver back-pressures the sender. The streaming mode always uses this exchange.
*
* <p>In streaming mode, upstream and downstream tasks run simultaneously to achieve low latency. An
* exchange is always pipelined (i.e. a result record is immediately sent to and processed by the
* downstream task). Thus, the receiver back-pressures the sender.
*
* <p>In batch mode, upstream and downstream tasks can run in stages. Blocking exchanges persist
* records to some storage. Downstream tasks then fetch these records after the upstream tasks
* <p>With blocking exchanges, upstream and downstream tasks run in stages. Records are persisted to
* some storage between stages. Downstream tasks then fetch these records after the upstream tasks
* finished. Such an exchange reduces the resources required to execute the job as it does not need
* to run upstream and downstream tasks simultaneously.
*/
@PublicEvolving
public enum ShuffleMode implements DescribedEnum {
public enum BatchShuffleMode implements DescribedEnum {

/**
* Upstream and downstream tasks run simultaneously.
*
* <p>This leads to lower latency and more evenly distributed (but higher) resource usage across
* tasks in batch mode.
*
* <p>This is the only supported shuffle behavior in streaming mode.
* tasks.
*/
ALL_EXCHANGES_PIPELINED(
text(
"Upstream and downstream tasks run simultaneously. This leads to lower latency "
+ "and more evenly distributed (but higher) resource usage across tasks "
+ "in batch mode. This is the only supported shuffle behavior in streaming "
+ "mode.")),
+ "and more evenly distributed (but higher) resource usage across tasks.")),

/**
* Upstream and downstream tasks run subsequently.
*
* <p>This reduces the resource usage in batch mode as downstream tasks are started after
* upstream tasks finished.
*
* <p>This shuffle behavior is not supported in streaming mode.
* <p>This reduces the resource usage as downstream tasks are started after upstream tasks
* finished.
*/
ALL_EXCHANGES_BLOCKING(
text(
"Upstream and downstream tasks run subsequently. This reduces the resource usage "
+ "in batch mode as downstream tasks are started after upstream tasks "
+ "finished. This shuffle behavior is not supported in streaming mode.")),

/**
* The framework chooses an appropriate shuffle behavior based on the {@link
* ExecutionOptions#RUNTIME_MODE} and slot assignment.
*/
AUTOMATIC(
text(
"The framework chooses an appropriate shuffle behavior based on the runtime mode "
+ "and slot assignment."));
+ "as downstream tasks are started after upstream tasks finished."));

private final InlineElement description;

ShuffleMode(InlineElement description) {
BatchShuffleMode(InlineElement description) {
this.description = description;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.ShuffleMode;
import org.apache.flink.configuration.description.Description;

import java.time.Duration;
Expand All @@ -40,30 +40,31 @@ public class ExecutionOptions {
"Runtime execution mode of DataStream programs. Among other things, "
+ "this controls task scheduling, network shuffle behavior, and time semantics.");

public static final ConfigOption<ShuffleMode> SHUFFLE_MODE =
ConfigOptions.key("execution.shuffle-mode")
.enumType(ShuffleMode.class)
.defaultValue(ShuffleMode.AUTOMATIC)
public static final ConfigOption<BatchShuffleMode> BATCH_SHUFFLE_MODE =
ConfigOptions.key("execution.batch-shuffle-mode")
.enumType(BatchShuffleMode.class)
.defaultValue(BatchShuffleMode.ALL_EXCHANGES_BLOCKING)
.withDescription(
Description.builder()
.text(
"Mode that defines how data is exchanged between tasks if the shuffling "
+ "behavior has not been set explicitly for an individual exchange. "
+ "The shuffle mode depends on the configured '%s' and is only "
+ "relevant for batch executions on bounded streams.",
"Defines how data is exchanged between tasks in batch '%s' if the shuffling "
+ "behavior has not been set explicitly for an individual exchange.",
text(RUNTIME_MODE.key()))
.linebreak()
.text(
"In streaming mode, upstream and downstream tasks run simultaneously to achieve low latency. "
+ "An exchange is always pipelined (i.e. a result record is immediately sent to and "
+ "processed by the downstream task). Thus, the receiver back-pressures the sender.")
"With pipelined exchanges, upstream and downstream tasks run simultaneously. "
+ "In order to achieve lower latency, a result record is immediately "
+ "sent to and processed by the downstream task. Thus, the receiver "
+ "back-pressures the sender. The streaming mode always uses this "
+ "exchange.")
.linebreak()
.text(
"In batch mode, upstream and downstream tasks can run in stages. Blocking exchanges persist "
+ "records to some storage. Downstream tasks then fetch these records after the "
+ "upstream tasks finished. Such an exchange reduces the resources required to "
+ "execute the job as it does not need to run upstream and downstream tasks "
+ "simultaneously.")
"With blocking exchanges, upstream and downstream tasks run in stages. "
+ "Records are persisted to some storage between stages. Downstream "
+ "tasks then fetch these records after the upstream tasks finished. "
+ "Such an exchange reduces the resources required to execute the "
+ "job as it does not need to run upstream and downstream "
+ "tasks simultaneously.")
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,10 +989,11 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode));

configuration
.getOptional(ExecutionOptions.SHUFFLE_MODE)
.getOptional(ExecutionOptions.BATCH_SHUFFLE_MODE)
.ifPresent(
shuffleMode ->
this.configuration.set(ExecutionOptions.SHUFFLE_MODE, shuffleMode));
this.configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode));

configuration
.getOptional(ExecutionOptions.SORT_INPUTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.api.graph;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.ShuffleMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
Expand Down Expand Up @@ -394,12 +394,11 @@ private void configureStreamGraphStreaming(final StreamGraph graph) {
}

private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeBatch() {
final ShuffleMode shuffleMode = configuration.get(ExecutionOptions.SHUFFLE_MODE);
final BatchShuffleMode shuffleMode = configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
switch (shuffleMode) {
case ALL_EXCHANGES_PIPELINED:
return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
case ALL_EXCHANGES_BLOCKING:
case AUTOMATIC:
return GlobalStreamExchangeMode.ALL_EDGES_BLOCKING;
default:
throw new IllegalArgumentException(
Expand All @@ -410,21 +409,11 @@ private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeBatch() {
}

private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeStreaming() {
final ShuffleMode shuffleMode = configuration.get(ExecutionOptions.SHUFFLE_MODE);
switch (shuffleMode) {
case ALL_EXCHANGES_PIPELINED:
case AUTOMATIC:
if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();
return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE;
}
return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
default:
throw new IllegalArgumentException(
String.format(
"Unsupported shuffle mode '%s' in STREAMING runtime mode.",
shuffleMode.toString()));
if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();
return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE;
}
return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
}

private void checkApproximateLocalRecoveryCompatibility() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.ShuffleMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
Expand All @@ -30,7 +30,6 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -76,7 +75,6 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

/**
* Tests for generating correct properties for sorting inputs in {@link RuntimeExecutionMode#BATCH}
Expand All @@ -90,39 +88,20 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
public void testShuffleMode() {
testGlobalStreamExchangeMode(
RuntimeExecutionMode.AUTOMATIC,
ShuffleMode.AUTOMATIC,
BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);

testGlobalStreamExchangeMode(
RuntimeExecutionMode.STREAMING,
ShuffleMode.AUTOMATIC,
BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);

testGlobalStreamExchangeMode(
RuntimeExecutionMode.BATCH,
ShuffleMode.AUTOMATIC,
GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);

testGlobalStreamExchangeMode(
RuntimeExecutionMode.BATCH,
ShuffleMode.ALL_EXCHANGES_PIPELINED,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED,
GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
}

@Test
public void testInvalidShuffleMode() {
try {
testGlobalStreamExchangeMode(
RuntimeExecutionMode.STREAMING, ShuffleMode.ALL_EXCHANGES_BLOCKING, null);
fail();
} catch (IllegalArgumentException e) {
assertThat(
e,
FlinkMatchers.containsMessage(
"Unsupported shuffle mode 'ALL_EXCHANGES_BLOCKING' in STREAMING runtime mode."));
}
}

@Test
public void testBatchJobType() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -532,11 +511,11 @@ private void testNoSupportForIterationsInBatchHelper(

private void testGlobalStreamExchangeMode(
RuntimeExecutionMode runtimeExecutionMode,
ShuffleMode shuffleMode,
BatchShuffleMode shuffleMode,
GlobalStreamExchangeMode expectedStreamExchangeMode) {
final Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
configuration.set(ExecutionOptions.SHUFFLE_MODE, shuffleMode);
configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSink<Integer> sink = addDummyPipeline(env);
final StreamGraphGenerator graphGenerator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public class ExecutionConfigOptions {
+ "\"SortMergeJoin\", \"HashAgg\", \"SortAgg\".\n"
+ "By default no operator is disabled.");

/** @deprecated Use {@link ExecutionOptions#SHUFFLE_MODE} instead. */
/** @deprecated Use {@link ExecutionOptions#BATCH_SHUFFLE_MODE} instead. */
@Deprecated
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
public static final ConfigOption<String> TABLE_EXEC_SHUFFLE_MODE =
Expand Down
Loading

0 comments on commit de7a00b

Please sign in to comment.