Skip to content

Commit

Permalink
[FLINK-3589] Allow setting Operator parallelism to default value
Browse files Browse the repository at this point in the history
Adds the public field ExecutionConfig.PARALLELISM_DEFAULT as a flag
value to indicate that the default parallelism should be used.

Adds the public field ExecutionConfig.PARALLELISM_UNKNOWN as a flag
value to indicate that the parallelism should remain unchanged.

This closes apache#1778
  • Loading branch information
greghogan committed Apr 13, 2016
1 parent 6c06168 commit 5350bc4
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
Expand Down Expand Up @@ -206,7 +207,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
*/
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();

Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
pc.setDefaultParallelism(parallelism);
Expand Down Expand Up @@ -271,7 +272,7 @@ public static JobExecutionResult execute(Plan plan) throws Exception {
* @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();

Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
pc.setDefaultParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.client.cli;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.ExecutionConfig;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -103,7 +104,7 @@ else if (args.length > 0) {
}
}
else {
parallelism = -1;
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
}

stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client.program;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void startNewSession() throws Exception {

@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,23 @@ public class ExecutionConfig implements Serializable {

/**
* The constant to use for the parallelism, if the system should use the number
* of currently available slots.
* of currently available slots.
*/
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;

/**
* The flag value indicating use of the default parallelism. This value can
* be used to reset the parallelism back to the default state.
*/
public static final int PARALLELISM_DEFAULT = -1;

/**
* The flag value indicating an unknown or unset parallelism. This value is
* not a valid parallelism and indicates that the parallelism should remain
* unchanged.
*/
public static final int PARALLELISM_UNKNOWN = -2;

private static final long DEFAULT_RESTART_DELAY = 10000L;

// --------------------------------------------------------------------------------------------
Expand All @@ -80,7 +93,7 @@ public class ExecutionConfig implements Serializable {

private boolean useClosureCleaner = true;

private int parallelism = -1;
private int parallelism = PARALLELISM_DEFAULT;

/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
Expand Down Expand Up @@ -212,7 +225,8 @@ public long getAutoWatermarkInterval() {
* with a parallelism of one (the final reduce to the single result value).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environment's default parallelism should be used.
* returns {@link #PARALLELISM_DEFAULT} if the environment's default parallelism
* should be used.
*/
public int getParallelism() {
return parallelism;
Expand All @@ -231,11 +245,13 @@ public int getParallelism() {
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
if (parallelism < 1 && parallelism != -1) {
throw new IllegalArgumentException(
"Parallelism must be at least one, or -1 (use system default).");
if (parallelism != PARALLELISM_UNKNOWN) {
if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
throw new IllegalArgumentException(
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
this.parallelism = parallelism;
}
this.parallelism = parallelism;
return this;
}

Expand Down
17 changes: 7 additions & 10 deletions flink-core/src/main/java/org/apache/flink/api/common/Plan.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
@Internal
public class Plan implements Visitable<Operator<?>> {

/** The default parallelism indicates to use the cluster's default */
private static final int DEFAULT_PARALELLISM = -1;

/**
* A collection of all sinks in the plan. Since the plan is traversed from the sinks to the sources, this
* collection must contain all the sinks.
Expand All @@ -65,7 +62,7 @@ public class Plan implements Visitable<Operator<?>> {
protected String jobName;

/** The default parallelism to use for nodes that have no explicitly specified parallelism. */
protected int defaultParallelism = DEFAULT_PARALELLISM;
protected int defaultParallelism = ExecutionConfig.PARALLELISM_DEFAULT;

/** Hash map for files in the distributed cache: registered name to cache entry. */
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<>();
Expand All @@ -91,7 +88,7 @@ public class Plan implements Visitable<Operator<?>> {
* @param jobName The name to display for the job.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, String jobName) {
this(sinks, jobName, DEFAULT_PARALELLISM);
this(sinks, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
}

/**
Expand Down Expand Up @@ -122,7 +119,7 @@ public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, String jobName,
* @param jobName The name to display for the job.
*/
public Plan(GenericDataSinkBase<?> sink, String jobName) {
this(sink, jobName, DEFAULT_PARALELLISM);
this(sink, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
}

/**
Expand Down Expand Up @@ -152,7 +149,7 @@ public Plan(GenericDataSinkBase<?> sink, String jobName, int defaultParallelism)
* @param sinks The collection will the sinks of the data flow.
*/
public Plan(Collection<? extends GenericDataSinkBase<?>> sinks) {
this(sinks, DEFAULT_PARALELLISM);
this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);
}

/**
Expand Down Expand Up @@ -180,7 +177,7 @@ public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, int defaultParal
* @param sink The data sink of the data flow.
*/
public Plan(GenericDataSinkBase<?> sink) {
this(sink, DEFAULT_PARALELLISM);
this(sink, ExecutionConfig.PARALLELISM_DEFAULT);
}

/**
Expand Down Expand Up @@ -287,8 +284,8 @@ public int getDefaultParallelism() {
* @param defaultParallelism The default parallelism for the plan.
*/
public void setDefaultParallelism(int defaultParallelism) {
checkArgument(defaultParallelism >= 1 || defaultParallelism == -1,
"The default parallelism must be positive, or -1 if the system should use the globally configured default.");
checkArgument(defaultParallelism >= 1 || defaultParallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The default parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT if the system should use the globally configured default.");

this.defaultParallelism = defaultParallelism;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
Expand All @@ -42,7 +43,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {

protected String name; // the name of the contract instance. optional.

private int parallelism = -1; // the number of parallel instances to use. -1, if unknown
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use

/**
* The return type of the user function.
Expand Down Expand Up @@ -162,9 +163,10 @@ public void setParameter(String key, boolean value) {
}

/**
* Gets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. If this
* value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
* Gets the parallelism for this contract instance. The parallelism denotes how many
* parallel instances of the user function will be spawned during the execution. If this
* value is {@link ExecutionConfig#PARALLELISM_DEFAULT}, then the system will decide the
* number of parallel instances by itself.
*
* @return The parallelism.
*/
Expand All @@ -174,10 +176,10 @@ public int getParallelism() {

/**
* Sets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. Set this
* value to <code>-1</code> to let the system decide on its own.
* how many parallel instances of the user function will be spawned during the execution.
*
* @param parallelism The number of parallel instances to spawn. -1, if unspecified.
* @param parallelism The number of parallel instances to spawn. Set this value to
* {@link ExecutionConfig#PARALLELISM_DEFAULT} to let the system decide on its own.
*/
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.flink.api.common;

import static org.junit.Assert.*;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ExecutionConfigTest {

Expand All @@ -45,4 +46,32 @@ public void testDoubleTypeRegistration() {

assertTrue(counter == expectedTypes.size());
}

@Test
public void testConfigurationOfParallelism() {
ExecutionConfig config = new ExecutionConfig();

// verify that PARALLELISM_UNKNOWN does not change initial parallelism
int parallelism = config.getParallelism();
config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);

assertEquals(parallelism, config.getParallelism());

// verify explicit change in parallelism
parallelism = 36;
config.setParallelism(parallelism);

assertEquals(parallelism, config.getParallelism());

// verify that PARALLELISM_UNKNOWN does not change configured parallelism
config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);

assertEquals(parallelism, config.getParallelism());

// verify that parallelism is reset to default flag value
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
config.setParallelism(parallelism);

assertEquals(parallelism, config.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public ExecutionConfig getConfig() {
* set will insert eventually an operation that runs non-parallel (parallelism of one).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environments default parallelism should be used.
* returns {@link ExecutionConfig#PARALLELISM_DEFAULT}, if the environment's default parallelism should be used.
*/
public int getParallelism() {
return config.getParallelism();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void startNewSession() throws Exception {

@Override
public String toString() {
return "Local Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
Expand All @@ -47,7 +48,7 @@ public class DataSink<T> {

private String name;

private int parallelism = -1;
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

private Configuration parameters;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class DeltaIteration<ST, WT> {

private String name;

private int parallelism = -1;
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

private boolean solutionSetUnManaged;

Expand Down Expand Up @@ -176,15 +177,16 @@ public String getName() {
* @return The iteration object, for function call chaining.
*/
public DeltaIteration<ST, WT> parallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
"The parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT (use default).");
this.parallelism = parallelism;
return this;
}

/**
* Gets the iteration's parallelism.
*
* @return The iterations parallelism, or -1, if not set.
* @return The iteration's parallelism, or {@link ExecutionConfig#PARALLELISM_DEFAULT} if not set.
*/
public int getParallelism() {
return parallelism;
Expand Down
Loading

0 comments on commit 5350bc4

Please sign in to comment.