Skip to content

Commit

Permalink
[FLINK-20897][table-planner] Support batch mode in StreamTableEnviron…
Browse files Browse the repository at this point in the history
…ment

This enables batch mode for StreamTableEnvironment.

Both StreamExecutionEnvironment, TableEnvironment, and StreamTableEnvironment
use StreamGraphGenerator with the same configuration. Previous work ensured
that when execution.runtime-mode is set to BATCH all batch properties are
either set consistently (e.g. shuffle mode) or have no impact on the pipeline
(e.g. auto watermark interval, state backends).

Most of the changes are removing checks and ensuring that internal (e.g. values)
and external (e.g. data stream, table source) source transformations are set
to BOUNDED. The latter is a complex topic as we currently use 4 different ways
of expressing external sources:

- InputFormatProvider: Boundedness needs to be explicitly set by the planner
due to custom formats that don't extend from FileInputFormat.
- SourceFunctionProvider: Boundedness needs to be explicitly set by the planner
via custom transformation to also disable progressive watermarks.
- DataStreamScanProvider: Boundedness needs to be explicitly set by the planner
to ensure old behavior again. New source interfaces + FileInputFormat are fine.
- TransformationScanProvider: Boundedness can be derived automatically and will
only work with new source interfaces + FileInputFormat.

This closes apache#16793.
  • Loading branch information
twalthr committed Aug 13, 2021
1 parent 17963e7 commit 0875db3
Show file tree
Hide file tree
Showing 27 changed files with 374 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def excluded_methods(cls):
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration'}
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
'generateStreamGraph'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2112,44 +2112,55 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
* This call clears previously registered {@link Transformation transformations}.
* Getter of the {@link StreamGraph} of the streaming job. This call clears previously
* registered {@link Transformation transformations}.
*
* @return The streamgraph representing the transformations
* @return The stream graph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the
* streaming job with the option to clear previously registered {@link Transformation
* transformations}. Clearing the transformations allows, for example, to not re-execute the
* same operations when calling {@link #execute()} multiple times.
* Getter of the {@link StreamGraph} of the streaming job with the option to clear previously
* registered {@link Transformation transformations}. Clearing the transformations allows, for
* example, to not re-execute the same operations when calling {@link #execute()} multiple
* times.
*
* @param clearTransformations Whether or not to clear previously registered transformations
* @return The streamgraph representing the transformations
* @return The stream graph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator().generate();
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
this.transformations.clear();
transformations.clear();
}
return streamGraph;
}

private StreamGraphGenerator getStreamGraphGenerator() {
/**
* Generates a {@link StreamGraph} that consists of the given {@link Transformation
* transformations} and is configured with the configuration of this environment.
*
* <p>This method does not access or clear the previously registered transformations.
*
* @param transformations list of transformations that the graph should contain
* @return The stream graph representing the transformations
*/
@Internal
public StreamGraph generateStreamGraph(List<Transformation<?>> transformations) {
return getStreamGraphGenerator(transformations).generate();
}

private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}

final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);

return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ public class StreamGraphGenerator {

private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;

private RuntimeExecutionMode runtimeExecutionMode = RuntimeExecutionMode.STREAMING;

private boolean shouldExecuteInBatchMode;

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -243,12 +241,6 @@ public StreamGraphGenerator(
this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
}

public StreamGraphGenerator setRuntimeExecutionMode(
final RuntimeExecutionMode runtimeExecutionMode) {
this.runtimeExecutionMode = checkNotNull(runtimeExecutionMode);
return this;
}

public StreamGraphGenerator setSavepointDir(Path savepointDir) {
this.savepointDir = savepointDir;
return this;
Expand Down Expand Up @@ -314,7 +306,7 @@ public StreamGraph generate() {
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);

alreadyTransformed = new HashMap<>();
Expand Down Expand Up @@ -460,7 +452,10 @@ private void setFineGrainedGlobalStreamExchangeMode(StreamGraph graph) {
}
}

private boolean shouldExecuteInBatchMode(final RuntimeExecutionMode configuredMode) {
private boolean shouldExecuteInBatchMode() {
final RuntimeExecutionMode configuredMode =
configuration.get(ExecutionOptions.RUNTIME_MODE);

final boolean existsUnboundedSource = existsUnboundedSource();

checkState(
Expand Down
Loading

0 comments on commit 0875db3

Please sign in to comment.