Skip to content

Commit

Permalink
[FLINK-15533] Consolidate parallelism in Environment constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jan 14, 2020
1 parent 6cf5940 commit 704b7d9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ public ExecutionEnvironment(
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(configuration);
this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader;

// the parallelism of a job or an operator can only be specified at the following places:
// i) at the operator level using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using the env.setParallelism() method
//
// if specified in multiple places, the priority order is the above.
//
// Given this, it is safe to overwrite the execution config default value here because all other ways assume
// that the env is already instantiated so they will overwrite the value passed here.
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
}

/**
Expand Down Expand Up @@ -895,8 +905,6 @@ public final JobClient executeAsync() throws Exception {
public JobClient executeAsync(String jobName) throws Exception {
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

consolidateParallelismDefinitionsInConfiguration();

final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
Expand All @@ -923,12 +931,6 @@ public JobClient executeAsync(String jobName) throws Exception {
}
}

private void consolidateParallelismDefinitionsInConfiguration() {
if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
}
}

/**
* Creates the plan with which the system will execute the program, and returns it as
* a String using a JSON representation of the execution data flow graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand Down Expand Up @@ -301,7 +302,7 @@ private static <X> List<String> getValuesAsString(MessageQueryParameter<X> param
JobGraph validateDefaultGraph() {
JobGraph jobGraph = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
Assert.assertEquals(0, ParameterProgram.actualArguments.length);
Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
Assert.assertEquals(CoreOptions.DEFAULT_PARALLELISM.defaultValue().intValue(), getExecutionConfig(jobGraph).getParallelism());
return jobGraph;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ public StreamExecutionEnvironment(
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(configuration);
this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader;

// the parallelism of a job or an operator can only be specified at the following places:
// i) at the operator level using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using the env.setParallelism() method, or
// iii) in the configuration passed here
//
// if specified in multiple places, the priority order is the above.
//
// Given this, it is safe to overwrite the execution config default value here because all other ways assume
// that the env is already instantiated so they will overwrite the value passed here.
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
}

protected Configuration getConfiguration() {
Expand Down Expand Up @@ -1729,8 +1740,6 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

consolidateParallelismDefinitionsInConfiguration();

final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

Expand All @@ -1756,12 +1765,6 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
}
}

private void consolidateParallelismDefinitionsInConfiguration() {
if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
}
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
* clears previously registered {@link Transformation transformations}.
Expand Down

0 comments on commit 704b7d9

Please sign in to comment.