Skip to content

Commit

Permalink
[FLINK-19583] Expose the execution.runtime-mode to users
Browse files Browse the repository at this point in the history
As part of FLIP-134, this PR exposes the 'execution.runtime-mode'
to the users. This options allows users to specify, among other
things, the task scheduling, network shuffle behavior, and the
time semantics.

This closes apache#13656
  • Loading branch information
kl0u committed Oct 16, 2020
1 parent 3a9a930 commit b3fd487
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 32 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/execution_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,11 @@
<td>Boolean</td>
<td>Tells if we should use compression for the state snapshot data or not</td>
</tr>
<tr>
<td><h5>execution.runtime-mode</h5></td>
<td style="word-wrap: break-word;">STREAMING</td>
<td><p>Enum</p>Possible values: [STREAMING, BATCH, AUTOMATIC]</td>
<td>Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api;
package org.apache.flink.api.common;

import org.apache.flink.annotation.Internal;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

Expand All @@ -31,6 +32,13 @@
@PublicEvolving
public class ExecutionOptions {

public static final ConfigOption<RuntimeExecutionMode> RUNTIME_MODE =
ConfigOptions.key("execution.runtime-mode")
.enumType(RuntimeExecutionMode.class)
.defaultValue(RuntimeExecutionMode.STREAMING)
.withDescription("Runtime execution mode of DataStream programs. Among other things, " +
"this controls task scheduling, network shuffle behavior, and time semantics.");

/**
* Should be moved to {@code ExecutionCheckpointingOptions} along with
* {@code ExecutionConfig#useSnapshotCompression}, which should be put into {@code CheckpointConfig}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
Expand Down Expand Up @@ -790,14 +791,18 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
this.cacheFile.clear();
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
configuration.getOptional(ExecutionOptions.RUNTIME_MODE)
.ifPresent(runtimeMode ->
this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode)
);
configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(
sortInputs -> this.getConfiguration().set(ExecutionOptions.SORT_INPUTS, sortInputs)
);
configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(
sortInputs -> this.getConfiguration().set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs)
);
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
}

private void registerCustomListeners(final ClassLoader classLoader, final List<String> listeners) {
Expand Down Expand Up @@ -1921,7 +1926,12 @@ private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}

final RuntimeExecutionMode executionMode =
configuration.get(ExecutionOptions.RUNTIME_MODE);

return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
Expand All @@ -32,7 +33,6 @@
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.RuntimeExecutionMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
Expand Down Expand Up @@ -286,19 +286,28 @@ private void setBatchStateBackendAndTimerService(StreamGraph graph) {
}

private boolean shouldExecuteInBatchMode(final RuntimeExecutionMode configuredMode) {
final boolean existsUnboundedSource = existsUnboundedSource();

checkState(configuredMode != RuntimeExecutionMode.BATCH || !existsUnboundedSource,
"Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. " +
"This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() +
"' to STREAMING or AUTOMATIC");

if (checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {
return configuredMode == RuntimeExecutionMode.BATCH;
}
return !existsUnboundedSource;
}

final boolean continuousSourceExists = transformations
private boolean existsUnboundedSource() {
return transformations
.stream()
.anyMatch(transformation ->
isUnboundedSource(transformation) ||
transformation
.getTransitivePredecessors()
.stream()
.anyMatch(this::isUnboundedSource));
return !continuousSourceExists;
transformation
.getTransitivePredecessors()
.stream()
.anyMatch(this::isUnboundedSource));
}

private boolean isUnboundedSource(final Transformation<?> transformation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
Expand Down
Loading

0 comments on commit b3fd487

Please sign in to comment.