Skip to content

Commit

Permalink
[FLINK-1679] use a consistent name for parallelism
Browse files Browse the repository at this point in the history
* rename occurrences of degree of parallelism to parallelism

* [Dd]egree[ -]of[ -]parallelism -> [pP]arallelism
* (DOP|dop) -> [pP]arallelism
* paraDegree -> parallelism
* degree-of-parallelism -> parallelism
* DEGREE_OF_PARALLELISM -> PARALLELISM
  • Loading branch information
mxm committed Mar 23, 2015
1 parent d994d2e commit cf84bca
Show file tree
Hide file tree
Showing 235 changed files with 1,096 additions and 1,093 deletions.
2 changes: 1 addition & 1 deletion docs/cluster_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ In particular,

* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelization.degree.default`) and
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`taskmanager.tmp.dirs`)

are very important configuration values.
Expand Down
4 changes: 2 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ This value is typically proportional to the number of physical CPU cores that
the TaskManager's machine has (e.g., equal to the number of cores, or half the
number of cores). [More about task slots](config.html#configuring-taskmanager-processing-slots).

- `parallelization.degree.default`: The default degree of parallelism to use for
programs that have no degree of parallelism specified. (DEFAULT: 1). For
- `parallelism.default`: The default parallelism to use for
programs that have no parallelism specified. (DEFAULT: 1). For
setups that have no concurrent jobs running, setting this value to
NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all
available execution resources for the program's execution.
Expand Down
6 changes: 3 additions & 3 deletions docs/gelly_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for ver
Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value
based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions are given as parameters to Gelly's `createVertexCentricIteration`, which returns a `VertexCentricIteration`.
The user can configure this iteration (set the name, the degree of parallelism, aggregators, etc.) and then run the computation, using the `runVertexCentricIteration` method:
The user can configure this iteration (set the name, the parallelism, aggregators, etc.) and then run the computation, using the `runVertexCentricIteration` method:

{% highlight java %}
Graph<Long, Double, Double> graph = ...
Expand All @@ -357,8 +357,8 @@ VertexCentricIteration<Long, Double, Double, Double> iteration =
// set the iteration name
iteration.setName("Single Source Shortest Paths");

// set the degree of parallelism
iteration.setDegreeOfParallelism(16);
// set the parallelism
iteration.setParallelism(16);

// run the computation
graph.runVertexCentricIteration(iteration);
Expand Down
2 changes: 1 addition & 1 deletion docs/internal_job_scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ and reacts to finished tasks or execution failures.
The JobManager receives the {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ "JobGraph" %},
which is a representation of the data flow consisting of operators ({% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java "JobVertex" %})
and intermediate results ({% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java "IntermediateDataSet" %}).
Each operator has properies, like the degree of parallelism and the code that it executes.
Each operator has properies, like the parallelism and the code that it executes.
In addition, the JobGraph has a set of attached libraries, that are neccessary to execute the code of the operators.

The JobManager transforms the JobGraph into an {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ "ExecutionGraph" %}.
Expand Down
28 changes: 14 additions & 14 deletions docs/programming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ obtain one using these static methods on class `ExecutionEnvironment`:
getExecutionEnvironment()

createLocalEnvironment()
createLocalEnvironment(int degreeOfParallelism)
createLocalEnvironment(int parallelism)

createRemoteEnvironment(String host, int port, String... jarFiles)
createRemoteEnvironment(String host, int port, int degreeOfParallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
{% endhighlight %}

Typically, you only need to use `getExecutionEnvironment()`, since this
Expand Down Expand Up @@ -318,10 +318,10 @@ obtain one using these static methods on class `ExecutionEnvironment`:
{% highlight scala %}
def getExecutionEnvironment

def createLocalEnvironment(degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()))
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()))

def createRemoteEnvironment(host: String, port: String, jarFiles: String*)
def createRemoteEnvironment(host: String, port: String, degreeOfParallelism: Int, jarFiles: String*)
def createRemoteEnvironment(host: String, port: String, parallelism: Int, jarFiles: String*)
{% endhighlight %}

Typically, you only need to use `getExecutionEnvironment()`, since this
Expand Down Expand Up @@ -2074,7 +2074,7 @@ val myLongs = env.fromCollection(longIt)
</div>

**Note:** Currently, the collection data source requires that data types and iterators implement
`Serializable`. Furthermore, collection data sources can not be executed in parallel (degree of
`Serializable`. Furthermore, collection data sources can not be executed in parallel (
parallelism = 1).

[Back to top](#top)
Expand Down Expand Up @@ -2704,15 +2704,15 @@ Parallel Execution
This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the task's
input data. The number of parallel instances of a task is called its *parallelism* or *degree of
parallelism (DOP)*.
input data. The number of parallel instances of a task is called its *parallelism*.

The degree of parallelism of a task can be specified in Flink on different levels.

The parallelism of a task can be specified in Flink on different levels.

### Operator Level

The parallelism of an individual operator, data source, or data sink can be defined by calling its
`setParallelism()` method. For example, the degree of parallelism of the `Sum` operator in the
`setParallelism()` method. For example, the parallelism of the `Sum` operator in the
[WordCount](#example-program) example program can be set to `5` as follows :


Expand Down Expand Up @@ -2749,21 +2749,21 @@ env.execute("Word Count Example")

### Execution Environment Level

Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An
Flink programs are executed in the context of an [execution environment](#program-skeleton). An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.

The default parallelism of an execution environment can be specified by calling the
`setDegreeOfParallelism()` method. To execute all operators, data sources, and data sinks of the
`setParallelism()` method. To execute all operators, data sources, and data sinks of the
[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
execution environment as follows:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(3);
env.setParallelism(3);

DataSet<String> text = [...]
DataSet<Tuple2<String, Integer>> wordCounts = [...]
Expand All @@ -2775,7 +2775,7 @@ env.execute("Word Count Example");
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment
env.setDegreeOfParallelism(3)
env.setParallelism(3)

val text = [...]
val wordCounts = text
Expand All @@ -2792,7 +2792,7 @@ env.execute("Word Count Example")
### System Level

A system-wide default parallelism for all execution environments can be defined by setting the
`parallelization.degree.default` property in `./conf/flink-conf.yaml`. See the
`parallelism.default` property in `./conf/flink-conf.yaml`. See the
[Configuration](config.html) documentation for details.

[Back to top](#top)
Expand Down
2 changes: 1 addition & 1 deletion docs/setup_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ In particular,

* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelization.degree.default`) and
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`taskmanager.tmp.dirs`)


Expand Down
6 changes: 3 additions & 3 deletions docs/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ The user is expected to connect to the outside world through the source and the

The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(sourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.

To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(dop)`.
To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(parallelism)`.

There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:

Expand Down Expand Up @@ -751,7 +751,7 @@ The above call would create global windows of 1000 elements group it by the firs
Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.

#### Global vs local discretisation
By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a degree of parallelism of 1 to be able to correctly execute the discretisation logic.
By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a parallelism of 1 to be able to correctly execute the discretisation logic.

Sometimes it is sufficient to create local discretisations, which allows the discretiser to run in parallel and apply the given discretisation logic at every discretiser instance. To allow local discretisation use the `local()` method of the windowed data stream.

Expand Down Expand Up @@ -1109,7 +1109,7 @@ Operator Settings

### Parallelism

Setting parallelism for operators works exactly the same way as in the batch Flink API. The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(dop)` method.
Setting parallelism for operators works exactly the same way as in the batch Flink API. The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(parallelism)` method.

### Buffer timeout

Expand Down
2 changes: 1 addition & 1 deletion docs/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` en

The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something.

Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelization.degree.default` if the number of slots has been specified.
Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.

If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class CliFrontend {
// YARN-session related constants
public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";

public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
Expand Down Expand Up @@ -175,18 +175,18 @@ public CliFrontend(String configDir) throws Exception {
throw new Exception("Cannot read the YARN properties file", e);
}

// configure the default degree of parallelism from YARN
String propDegree = yarnProperties.getProperty(YARN_PROPERTIES_DOP);
if (propDegree != null) { // maybe the property is not set
// configure the default parallelism from YARN
String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
if (propParallelism != null) { // maybe the property is not set
try {
int paraDegree = Integer.parseInt(propDegree);
this.config.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
int parallelism = Integer.parseInt(propParallelism);
this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);

logAndSysout("YARN properties set default parallelism to " + paraDegree);
logAndSysout("YARN properties set default parallelism to " + parallelism);
}
catch (NumberFormatException e) {
throw new Exception("Error while parsing the YARN properties: " +
"Property " + YARN_PROPERTIES_DOP + " is not an integer.");
"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ public int run(String[] args) {
Properties yarnProps = new Properties();
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
if (flinkYarnClient.getTaskManagerSlots() != -1) {
String degreeOfParallelism =
String parallelism =
Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, degreeOfParallelism);
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism);
}
// add dynamic properties
if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ else if (prog.isUsingInteractiveMode()) {
// temporary hack to support the optimizer plan preview
OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
if (parallelism > 0) {
env.setDegreeOfParallelism(parallelism);
env.setParallelism(parallelism);
}
env.setAsContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);

return this.client.run(toRun, getDegreeOfParallelism(), true);
return this.client.run(toRun, getParallelism(), true);
}

@Override
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan("unnamed job");

OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getDegreeOfParallelism());
OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());

PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
Expand All @@ -68,7 +68,7 @@ public String getExecutionPlan() throws Exception {

@Override
public String toString() {
return "Context Environment (DOP = " + (getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism())
return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
+ ") : " + getIdString();
}

Expand Down Expand Up @@ -118,7 +118,7 @@ public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach,
public ExecutionEnvironment createExecutionEnvironment() {
ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader);
if (defaultParallelism > 0) {
env.setDegreeOfParallelism(defaultParallelism);
env.setParallelism(defaultParallelism);
}
return env;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ExecutionConfig implements Serializable {
public static final String CONFIG_KEY = "runtime.config";

/**
* The constant to use for the degree of parallelism, if the system should use the number
* The constant to use for the parallelism, if the system should use the number
* of currently available slots.
*/
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
Expand Down
Loading

0 comments on commit cf84bca

Please sign in to comment.