Skip to content

Commit

Permalink
[FLINK-20240] Add execution.runtime-mode setter in StreamExecutionEnv…
Browse files Browse the repository at this point in the history
…ironment

This closes apache#14137.
  • Loading branch information
kl0u committed Nov 20, 2020
1 parent 7e05b33 commit 94aaac4
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.api.common;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;

/**
* Runtime execution mode of DataStream programs. Among other things, this controls task scheduling,
Expand All @@ -27,7 +27,7 @@
* @see <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API">
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a>
*/
@Internal
@PublicEvolving
public enum RuntimeExecutionMode {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def excluded_methods(cls):
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence"}
'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence",
'setRuntimeMode'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ public StreamExecutionEnvironment setParallelism(int parallelism) {
return this;
}

/**
* Sets the runtime execution mode for the application (see {@link RuntimeExecutionMode}).
* This is equivalent to setting the {@code execution.runtime-mode} in your application's
* configuration file.
*
* <p>We recommend users to NOT use this method but set the {@code execution.runtime-mode}
* using the command-line when submitting the application. Keeping the application code
* configuration-free allows for more flexibility as the same application will be able to
* be executed in any execution mode.
*
* @param executionMode the desired execution mode.
* @return The execution environment of your application.
*/
@PublicEvolving
public StreamExecutionEnvironment setRuntimeMode(final RuntimeExecutionMode executionMode) {
checkNotNull(executionMode);
configuration.set(ExecutionOptions.RUNTIME_MODE, executionMode);
return this;
}

/**
* Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)
* is Short.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
Expand Down Expand Up @@ -75,6 +76,25 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.setParallelism(parallelism)
}

/**
* Sets the runtime execution mode for the application (see [[RuntimeExecutionMode]]).
* This is equivalent to setting the "execution.runtime-mode" in your application's
* configuration file.
*
* We recommend users to NOT use this method but set the "execution.runtime-mode"
* using the command-line when submitting the application. Keeping the application code
* configuration-free allows for more flexibility as the same application will be able to
* be executed in any execution mode.
*
* @param executionMode the desired execution mode.
* @return The execution environment of your application.
*/
@PublicEvolving
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment = {
javaEnv.setRuntimeMode(executionMode)
this
}

/**
* Sets the maximum degree of parallelism defined for the program.
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -156,12 +154,8 @@ public void batchSumSingleResultPerKey() throws Exception {
}

private StreamExecutionEnvironment getExecutionEnvironment() {

Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
config);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(1);

// trick the collecting sink into working even in the face of failures 🙏
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.util.FiniteTestSource;
Expand Down Expand Up @@ -235,17 +233,14 @@ private static List<String> getSplittedGlobalCommittedData() {

private StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(100);
return env;
}

private StreamExecutionEnvironment buildBatchEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
env.configure(config, this.getClass().getClassLoader());
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
return env;
}
}

0 comments on commit 94aaac4

Please sign in to comment.