Skip to content

Commit

Permalink
[FLINK-2784] Remove deprecated configuration keys and updated documen…
Browse files Browse the repository at this point in the history
…tation

This closes apache#1244
  • Loading branch information
fhueske authored and vasia committed Oct 9, 2015
1 parent 91ffbc1 commit 3b77faa
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 103 deletions.
2 changes: 1 addition & 1 deletion docs/apis/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ env.execute()
### System Level

A system-wide default parallelism for all execution environments can be defined by setting the
`parallelization.degree.default` property in `./conf/flink-conf.yaml`. See the
`parallelism.default` property in `./conf/flink-conf.yaml`. See the
[Configuration](config.html) documentation for details.

[Back to top](#top)
Expand Down
7 changes: 4 additions & 3 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ results outside of the JVM heap. For setups with larger quantities of memory,
this can improve the efficiency of the operations performed on the memory
(DEFAULT: false).

- `taskmanager.memory.segment-size`: The size of memory buffers used by the
memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

### Other

- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of
Expand Down Expand Up @@ -254,8 +257,6 @@ network stack. This number determines how many streaming data exchange channels
a TaskManager can have at the same time and how well buffered the channels are.
If a job is rejected or you get a warning that the system has not enough buffers
available, increase this value (DEFAULT: 2048).
- `taskmanager.network.bufferSizeInBytes`: The size of the network buffers, in
bytes (DEFAULT: 32768 (= 32 KiBytes)).
- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task
manager reserves on the JVM's heap space for sorting, hash tables, and caching
of intermediate results. If unspecified (-1), the memory manager will take a fixed
Expand Down Expand Up @@ -441,7 +442,7 @@ The number and size of network buffers can be configured with the following
parameters:

- `taskmanager.network.numberOfBuffers`, and
- `taskmanager.network.bufferSizeInBytes`.
- `taskmanager.memory.segment-size`.

### Configuring Temporary I/O Directories

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ public final class ConfigConstants {
*/
public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default";

/**
* The deprecated config parameter defining the default parallelism for jobs.
*/
@Deprecated
public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default";

/**
* Config parameter for the number of re-tries for failed tasks. Setting this
* value to 0 effectively disables fault tolerance.
Expand Down Expand Up @@ -135,12 +129,6 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

/**
* Deprecated config parameter defining the size of the buffers used in the network stack.
*/
@Deprecated
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";

/**
* Config parameter defining the size of memory buffers used by the network stack and the memory manager.
*/
Expand Down Expand Up @@ -205,22 +193,11 @@ public final class ConfigConstants {
*/
public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";

/**
* Upper bound for heap cutoff on YARN.
* The "yarn.heap-cutoff-ratio" is removing a certain ratio from the heap.
* This value is limiting this cutoff to a absolute value.
*
* THE VALUE IS NO LONGER IN USE.
*/
@Deprecated
public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";

/**
* Minimum amount of memory to remove from the heap space as a safety margin.
*/
public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";


/**
* Reallocate failed YARN containers.
*/
Expand Down Expand Up @@ -547,12 +524,6 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;

/**
* Default size of network stack buffers.
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;

/**
* Default size of memory segments in the network stack and the memory manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,9 @@ public Optimizer(DataStatistics stats, CostEstimator estimator, Configuration co
this.costEstimator = estimator;

// determine the default parallelism
// check for old key string first, then for new one
this.defaultParallelism = config.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM);
// now check for new one which overwrites old values
this.defaultParallelism = config.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
this.defaultParallelism);
ConfigConstants.DEFAULT_PARALLELISM);

if (defaultParallelism < 1) {
LOG.warn("Config value " + defaultParallelism + " for option "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,9 @@ class LocalFlinkMiniCluster(
// set this only if no memory was pre-configured
if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {

val bufferSizeNew: Int = config.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)

val bufferSizeOld: Int = config.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
val bufferSize: Int =
if (bufferSizeNew != -1) {
bufferSizeNew
}
else if (bufferSizeOld == -1) {
// nothing has been configured, take the default
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
}
else {
bufferSizeOld
}
val bufferSize: Int = config.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)

val bufferMem: Long = config.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,41 +1758,19 @@ object TaskManager {
checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)

val pageSizeNew: Int = configuration.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)

val pageSizeOld: Int = configuration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)

val pageSize: Int =
if (pageSizeNew != -1) {
// new page size has been configured
checkConfigParameter(pageSizeNew >= MemoryManager.MIN_PAGE_SIZE, pageSizeNew,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)

checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
"Memory segment size must be a power of 2.")

pageSizeNew
}
else if (pageSizeOld == -1) {
// nothing has been configured, take the default
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
}
else {
// old page size has been configured
checkConfigParameter(pageSizeOld >= MemoryManager.MIN_PAGE_SIZE, pageSizeOld,
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
"Minimum buffer size is " + MemoryManager.MIN_PAGE_SIZE)

checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld,
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
"Buffer size must be a power of 2.")

pageSizeOld
}
val pageSize: Int = configuration.getInteger(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)

// check page size of for minimum size
checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)

// check page size for power of two
checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
"Memory segment size must be a power of 2.")

// check whether we use heap or off-heap memory
val memType: MemoryType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testMemoryConfigWrong() {

// something ridiculously high
final long memSize = (((long) Integer.MAX_VALUE - 1) *
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20;
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
try {
TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,10 @@ protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> clas
setParallelism(parallelism);
}
else {
// first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM));
// then for new
// determine parallelism
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
getParallelism()));
ConfigConstants.DEFAULT_PARALLELISM));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ protected StreamPlanEnvironment(ExecutionEnvironment env) {
if (parallelism > 0) {
setParallelism(parallelism);
} else {
// first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM));
// then for new
// determine parallelism
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
getParallelism()));
ConfigConstants.DEFAULT_PARALLELISM));
}
}

Expand Down

0 comments on commit 3b77faa

Please sign in to comment.