Skip to content

Commit

Permalink
[FLINK-12122] Introduce ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY
Browse files Browse the repository at this point in the history
Add config option to enable to evenly spread out slots across all available
TaskExecutors.
  • Loading branch information
tillrohrmann committed Nov 7, 2019
1 parent 7973453 commit 0df281e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 12 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/cluster_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>cluster.evenly-spread-out-slots</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <span markdown="span">`TaskExecutors`</span>.</td>
</tr>
<tr>
<td><h5>cluster.registration.error-delay</h5></td>
<td style="word-wrap: break-word;">10000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.description.Description;

import static org.apache.flink.configuration.description.TextElement.code;

/**
* Options which control the cluster behaviour.
Expand Down Expand Up @@ -50,4 +53,13 @@ public class ClusterOptions {
.key("cluster.services.shutdown-timeout")
.defaultValue(30000L)
.withDescription("The shutdown timeout for cluster services like executors in milliseconds.");

public static final ConfigOption<Boolean> EVENLY_SPREAD_OUT_SLOTS_STRATEGY = ConfigOptions
.key("cluster.evenly-spread-out-slots")
.defaultValue(false)
.withDescription(
Description.builder()
.text("Enable the slot spread out allocation strategy. This strategy tries to spread out " +
"the slots evenly across all available %s.", code("TaskExecutors"))
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.resourcemanager.slotmanager.AnyMatchingSlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.LeastUtilizationSlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotMatchingStrategy;
import org.apache.flink.util.Preconditions;

/**
Expand Down Expand Up @@ -54,15 +56,7 @@ public static ResourceManagerRuntimeServices fromConfiguration(
HighAvailabilityServices highAvailabilityServices,
ScheduledExecutor scheduledExecutor) throws Exception {

final SlotManagerConfiguration slotManagerConfiguration = configuration.getSlotManagerConfiguration();

final SlotManager slotManager = new SlotManagerImpl(
AnyMatchingSlotMatchingStrategy.INSTANCE,
scheduledExecutor,
slotManagerConfiguration.getTaskManagerRequestTimeout(),
slotManagerConfiguration.getSlotRequestTimeout(),
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.isWaitResultConsumedBeforeRelease());
final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor);

final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
Expand All @@ -71,4 +65,24 @@ public static ResourceManagerRuntimeServices fromConfiguration(

return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}

private static SlotManager createSlotManager(ResourceManagerRuntimeServicesConfiguration configuration, ScheduledExecutor scheduledExecutor) {
final SlotManagerConfiguration slotManagerConfiguration = configuration.getSlotManagerConfiguration();

final SlotMatchingStrategy slotMatchingStrategy;

if (slotManagerConfiguration.evenlySpreadOutSlots()) {
slotMatchingStrategy = LeastUtilizationSlotMatchingStrategy.INSTANCE;
} else {
slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
}

return new SlotManagerImpl(
slotMatchingStrategy,
scheduledExecutor,
slotManagerConfiguration.getTaskManagerRequestTimeout(),
slotManagerConfiguration.getSlotRequestTimeout(),
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.isWaitResultConsumedBeforeRelease());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
Expand All @@ -41,17 +42,20 @@ public class SlotManagerConfiguration {
private final Time slotRequestTimeout;
private final Time taskManagerTimeout;
private final boolean waitResultConsumedBeforeRelease;
private final boolean evenlySpreadOutSlots;

public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease) {
boolean waitResultConsumedBeforeRelease,
boolean evenlySpreadOutSlots) {

this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
this.evenlySpreadOutSlots = evenlySpreadOutSlots;
}

public Time getTaskManagerRequestTimeout() {
Expand All @@ -70,6 +74,10 @@ public boolean isWaitResultConsumedBeforeRelease() {
return waitResultConsumedBeforeRelease;
}

public boolean evenlySpreadOutSlots() {
return evenlySpreadOutSlots;
}

public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final Time rpcTimeout;
try {
Expand All @@ -86,7 +94,14 @@ public static SlotManagerConfiguration fromConfiguration(Configuration configura
boolean waitResultConsumedBeforeRelease =
configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);

return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);

return new SlotManagerConfiguration(
rpcTimeout,
slotRequestTimeout,
taskManagerTimeout,
waitResultConsumedBeforeRelease,
evenlySpreadOutSlots);
}

private static Time getSlotRequestTimeout(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void confirmLeadership(UUID leaderId, String leaderAddress) {
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
true));
true,
false));
ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
Expand Down

0 comments on commit 0df281e

Please sign in to comment.