Skip to content

Commit

Permalink
[FLINK-21428][runtime] Introduces new option taskmanager.slot.timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp authored and zentol committed Mar 10, 2021
1 parent c2639a1 commit a4e0efd
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,11 @@
<td>String</td>
<td>The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.</td>
</tr>
<tr>
<td><h5>taskmanager.slot.timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>Timeout used for identifying inactive slots. The TaskManager will free the slot if it does not become active within the given amount of time. Inactive slots can be caused by an out-dated slot request.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,11 @@
<td>String</td>
<td>The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.</td>
</tr>
<tr>
<td><h5>taskmanager.slot.timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>Timeout used for identifying inactive slots. The TaskManager will free the slot if it does not become active within the given amount of time. Inactive slots can be caused by an out-dated slot request.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ public class TaskManagerOptions {
+ " is typically proportional to the number of physical CPU cores that the TaskManager's machine has"
+ " (e.g., equal to the number of cores, or half the number of cores).");

/** Timeout for identifying inactive slots. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
public static final ConfigOption<Duration> SLOT_TIMEOUT =
ConfigOptions.key("taskmanager.slot.timeout")
.durationType()
.defaultValue(TimeUtils.parseDuration("10 s"))
.withDescription(
"Timeout used for identifying inactive slots. The TaskManager will free the slot if it does not become active "
+ "within the given amount of time. Inactive slots can be caused by an out-dated slot request.");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
key("taskmanager.debug.memory.log")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public CompletableFuture<Acknowledge> submitTask(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());
taskManagerConfiguration.getRpcTimeout());

final TaskOperatorEventGateway taskOperatorEventGateway =
new RpcTaskOperatorEventGateway(
Expand Down Expand Up @@ -1051,7 +1051,7 @@ private void allocateSlot(
jobId,
allocationId,
resourceProfile,
taskManagerConfiguration.getTimeout())) {
taskManagerConfiguration.getSlotTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
Expand Down Expand Up @@ -1264,7 +1264,7 @@ private void establishResourceManagerConnection(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
taskManagerConfiguration.getRpcTimeout());

slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
Expand Down Expand Up @@ -1410,7 +1410,9 @@ private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnec

CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
jobMasterGateway.offerSlots(
getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());
getResourceID(),
reservedSlots,
taskManagerConfiguration.getRpcTimeout());

acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
Expand Down Expand Up @@ -1580,7 +1582,7 @@ private void disconnectJobManagerConnection(
for (AllocationID activeSlotAllocationID : activeSlotAllocationIDs) {
try {
if (!taskSlotTable.markSlotInactive(
activeSlotAllocationID, taskManagerConfiguration.getTimeout())) {
activeSlotAllocationID, taskManagerConfiguration.getSlotTimeout())) {
freeSlotInternal(activeSlotAllocationID, freeingCause);
}
} catch (SlotNotFoundException e) {
Expand Down Expand Up @@ -1616,7 +1618,7 @@ private JobTable.Connection associateWithJobManager(
new RpcResultPartitionConsumableNotifier(
jobMasterGateway,
getRpcService().getExecutor(),
taskManagerConfiguration.getTimeout());
taskManagerConfiguration.getRpcTimeout());

PartitionProducerStateChecker partitionStateChecker =
new RpcPartitionStateChecker(jobMasterGateway);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

private final String[] tmpDirectories;

private final Time timeout;
private final Time rpcTimeout;

private final Time slotTimeout;

// null indicates an infinite duration
@Nullable private final Time maxRegistrationDuration;
Expand All @@ -76,7 +78,8 @@ public TaskManagerConfiguration(
ResourceProfile defaultSlotResourceProfile,
ResourceProfile totalResourceProfile,
String[] tmpDirectories,
Time timeout,
Time rpcTimeout,
Time slotTimeout,
@Nullable Time maxRegistrationDuration,
Configuration configuration,
boolean exitJvmOnOutOfMemory,
Expand All @@ -90,7 +93,8 @@ public TaskManagerConfiguration(
this.defaultSlotResourceProfile = defaultSlotResourceProfile;
this.totalResourceProfile = totalResourceProfile;
this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
this.timeout = Preconditions.checkNotNull(timeout);
this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
this.slotTimeout = Preconditions.checkNotNull(slotTimeout);
this.maxRegistrationDuration = maxRegistrationDuration;
this.configuration =
new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
Expand All @@ -114,8 +118,12 @@ public ResourceProfile getTotalResourceProfile() {
return totalResourceProfile;
}

public Time getTimeout() {
return timeout;
public Time getRpcTimeout() {
return rpcTimeout;
}

public Time getSlotTimeout() {
return slotTimeout;
}

@Nullable
Expand Down Expand Up @@ -178,17 +186,20 @@ public static TaskManagerConfiguration fromConfiguration(

final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);

final Time timeout;
final Time rpcTimeout;
try {
timeout = AkkaUtils.getTimeoutAsTime(configuration);
rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
} catch (Exception e) {
throw new IllegalArgumentException(
"Invalid format for '"
+ AkkaOptions.ASK_TIMEOUT.key()
+ "'.Use formats like '50 s' or '1 min' to specify the timeout.");
+ "'. Use formats like '50 s' or '1 min' to specify the timeout.");
}

LOG.debug("Messages have a max timeout of " + timeout);
LOG.debug("Messages have a max timeout of " + rpcTimeout);

final Time slotTimeout =
Time.milliseconds(configuration.get(TaskManagerOptions.SLOT_TIMEOUT).toMillis());

Time finiteRegistrationDuration;
try {
Expand Down Expand Up @@ -235,7 +246,8 @@ public static TaskManagerConfiguration fromConfiguration(
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(
taskExecutorResourceSpec),
tmpDirPaths,
timeout,
rpcTimeout,
slotTimeout,
finiteRegistrationDuration,
configuration,
exitOnOom,
Expand Down

0 comments on commit a4e0efd

Please sign in to comment.