From 94aaac49a3179d4c020a86d42ccd767b3f045816 Mon Sep 17 00:00:00 2001 From: Kostas Kloudas Date: Thu, 19 Nov 2020 13:02:41 +0100 Subject: [PATCH] [FLINK-20240] Add execution.runtime-mode setter in StreamExecutionEnvironment This closes #14137. --- .../api/common/RuntimeExecutionMode.java | 4 ++-- ...ream_execution_environment_completeness.py | 3 ++- .../StreamExecutionEnvironment.java | 20 +++++++++++++++++++ .../scala/StreamExecutionEnvironment.scala | 20 +++++++++++++++++++ .../DataStreamBatchExecutionITCase.java | 10 ++-------- .../test/streaming/runtime/SinkITCase.java | 9 ++------- 6 files changed, 48 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java b/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java index 5611fceed2681..5c882d8cab485 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java @@ -17,7 +17,7 @@ package org.apache.flink.api.common; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; /** * Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, @@ -27,7 +27,7 @@ * @see * https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API */ -@Internal +@PublicEvolving public enum RuntimeExecutionMode { /** diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index 8cb4c5a3eae20..08262276e65ed 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -48,7 +48,8 @@ def excluded_methods(cls): 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence"} + 'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence", + 'setRuntimeMode'} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9a30d5dc59e71..2bf27bf5f66c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -284,6 +284,26 @@ public StreamExecutionEnvironment setParallelism(int parallelism) { return this; } + /** + * Sets the runtime execution mode for the application (see {@link RuntimeExecutionMode}). + * This is equivalent to setting the {@code execution.runtime-mode} in your application's + * configuration file. + * + *

We recommend users to NOT use this method but set the {@code execution.runtime-mode} + * using the command-line when submitting the application. Keeping the application code + * configuration-free allows for more flexibility as the same application will be able to + * be executed in any execution mode. + * + * @param executionMode the desired execution mode. + * @return The execution environment of your application. + */ + @PublicEvolving + public StreamExecutionEnvironment setRuntimeMode(final RuntimeExecutionMode executionMode) { + checkNotNull(executionMode); + configuration.set(ExecutionOptions.RUNTIME_MODE, executionMode); + return this; + } + /** * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) * is Short.MAX_VALUE. diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 43fd7dbaedf51..6bb2acabbcb33 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import com.esotericsoftware.kryo.Serializer import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving} +import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration @@ -75,6 +76,25 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.setParallelism(parallelism) } + /** + * Sets the runtime execution mode for the application (see [[RuntimeExecutionMode]]). + * This is equivalent to setting the "execution.runtime-mode" in your application's + * configuration file. + * + * We recommend users to NOT use this method but set the "execution.runtime-mode" + * using the command-line when submitting the application. Keeping the application code + * configuration-free allows for more flexibility as the same application will be able to + * be executed in any execution mode. + * + * @param executionMode the desired execution mode. + * @return The execution environment of your application. + */ + @PublicEvolving + def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment = { + javaEnv.setRuntimeMode(executionMode) + this + } + /** * Sets the maximum degree of parallelism defined for the program. * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also diff --git a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java index e9617c08f70c9..dd8c6744aa74e 100644 --- a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -156,12 +154,8 @@ public void batchSumSingleResultPerKey() throws Exception { } private StreamExecutionEnvironment getExecutionEnvironment() { - - Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment( - config); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(1); // trick the collecting sink into working even in the face of failures 🙏 diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index ececbd31a3f2c..ac55c0277de35 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -20,8 +20,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSink; import org.apache.flink.streaming.util.FiniteTestSource; @@ -235,17 +233,14 @@ private static List getSplittedGlobalCommittedData() { private StreamExecutionEnvironment buildStreamEnv() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.enableCheckpointing(100); return env; } private StreamExecutionEnvironment buildBatchEnv() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); - env.configure(config, this.getClass().getClassLoader()); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } }