diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1108f79283161..20f942f506954 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -165,6 +165,16 @@ public ExecutionEnvironment( this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(configuration); this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader; + + // the parallelism of a job or an operator can only be specified at the following places: + // i) at the operator level using the SingleOutputStreamOperator.setParallelism(). + // ii) programmatically by using the env.setParallelism() method + // + // if specified in multiple places, the priority order is the above. + // + // Given this, it is safe to overwrite the execution config default value here because all other ways assume + // that the env is already instantiated so they will overwrite the value passed here. + this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM)); } /** @@ -895,8 +905,6 @@ public final JobClient executeAsync() throws Exception { public JobClient executeAsync(String jobName) throws Exception { checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); - consolidateParallelismDefinitionsInConfiguration(); - final Plan plan = createProgramPlan(jobName); final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); @@ -923,12 +931,6 @@ public JobClient executeAsync(String jobName) throws Exception { } } - private void consolidateParallelismDefinitionsInConfiguration() { - if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) { - configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism); - } - } - /** * Creates the plan with which the system will execute the program, and returns it as * a String using a JSON representation of the execution data flow graph. diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java index 6939ba30c6359..354be7a0b11b2 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.HandlerRequest; @@ -301,7 +302,7 @@ private static List getValuesAsString(MessageQueryParameter param JobGraph validateDefaultGraph() { JobGraph jobGraph = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null); Assert.assertEquals(0, ParameterProgram.actualArguments.length); - Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism()); + Assert.assertEquals(CoreOptions.DEFAULT_PARALLELISM.defaultValue().intValue(), getExecutionConfig(jobGraph).getParallelism()); return jobGraph; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9d5ba4ab31770..49a8fe915790e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -207,6 +207,17 @@ public StreamExecutionEnvironment( this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(configuration); this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader; + + // the parallelism of a job or an operator can only be specified at the following places: + // i) at the operator level using the SingleOutputStreamOperator.setParallelism(). + // ii) programmatically by using the env.setParallelism() method, or + // iii) in the configuration passed here + // + // if specified in multiple places, the priority order is the above. + // + // Given this, it is safe to overwrite the execution config default value here because all other ways assume + // that the env is already instantiated so they will overwrite the value passed here. + this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM)); } protected Configuration getConfiguration() { @@ -1729,8 +1740,6 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception { checkNotNull(streamGraph, "StreamGraph cannot be null."); checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); - consolidateParallelismDefinitionsInConfiguration(); - final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); @@ -1756,12 +1765,6 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception { } } - private void consolidateParallelismDefinitionsInConfiguration() { - if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) { - configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism); - } - } - /** * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call * clears previously registered {@link Transformation transformations}.