Skip to content

Commit

Permalink
[FLINK-8399] [runtime] use independent configurations for the differe…
Browse files Browse the repository at this point in the history
…nt timeouts in slot manager

This closes apache#5271.
  • Loading branch information
tiemsn authored and tillrohrmann committed Jan 15, 2018
1 parent de30d16 commit b459915
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
9 changes: 8 additions & 1 deletion docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ The configuration keys in this section are independent of the used resource mana
- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port
of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.


### YARN

- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove this fraction of the memory from the requested heap as a safety margin and add it to the memory used off-heap.
Expand Down Expand Up @@ -649,6 +648,14 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated

- `historyserver.web.ssl.enabled`: Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true (DEFAULT: `false`).

### Slot Manager (Flip-6)

The configuration keys in this section are relevant for the SlotManager running in the Flip-6 ResourceManager

- `slotmanager.request-timeout`: Timeout after which a slot request will be discarded by the SlotManager. The value is specified in milli seconds (DEFAULT: `300000`).

- `slotmanager.taskmanager-timeout`: Timeout after which an idling task manager's container is released (DEFAULT: `30000`).

## Background


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;

/**
* The set of configuration options relating to the ResourceManager
* The set of configuration options relating to the ResourceManager.
*/
@PublicEvolving
public class ResourceManagerOptions {
Expand Down Expand Up @@ -58,6 +58,22 @@ public class ResourceManagerOptions {
.defaultValue(600)
.withDeprecatedKeys("yarn.heap-cutoff-min");

/**
* The timeout for a slot request to be discarded, in milliseconds.
*/
public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = ConfigOptions
.key("slotmanager.request-timeout")
.defaultValue(600000L)
.withDescription("The timeout for a slot request to be discarded.");

/**
* The timeout for an idle task manager to be released, in milliseconds.
*/
public static final ConfigOption<Long> TASK_MANAGER_TIMEOUT = ConfigOptions
.key("slotmanager.taskmanager-timeout")
.defaultValue(30000L)
.withDescription("The timeout for an idle task manager to be released.");

/**
* Prefix for passing custom environment variables to Flink's master process.
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;

import scala.concurrent.duration.Duration;

/**
* Configuration for the {@link SlotManager}.
*/
public class SlotManagerConfiguration {

private final Time taskManagerRequestTimeout;
Expand Down Expand Up @@ -54,15 +59,20 @@ public Time getTaskManagerTimeout() {

public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time timeout;
final Time rpcTimeout;

try {
timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " +
"value " + AkkaOptions.ASK_TIMEOUT + '.', e);
}

return new SlotManagerConfiguration(timeout, timeout, timeout);
final Time slotRequestTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT));
final Time taskManagerTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
}
}

0 comments on commit b459915

Please sign in to comment.