Skip to content

Commit

Permalink
[FLINK-10941][RM] Release task executor after all its result partitio…
Browse files Browse the repository at this point in the history
…ns are released

The task executor gateway can use NetworkEviroment.PartitionManager to check still registered and unreleased partitions because it is the central point to manage partitions. It would also simplify how it is queried now going through TaskSlot and lingering Tasks. Eventually, NetworkEviroment becomes ShuffleService which could decide whether producer can be released or not this way or another.

We still make producer task executor depend on another consumer task executor, though, we do not see any problems with it now, I think it is still more safe to introduce an option as a feature flag. By default, task executor will wait for consumers, as this PR suggests, but users can always fallback to the previous behaviour using the option.
  • Loading branch information
azagrebin authored and pnowojski committed Apr 12, 2019
1 parent 8aa9deb commit 264a0c9
Show file tree
Hide file tree
Showing 21 changed files with 294 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ public class ResourceManagerOptions {
.text("The timeout for an idle task manager to be released.")
.build());

/**
* Release task executor only when each produced result partition is either consumed or failed.
*
* <p>Currently, produced result partition is released when it fails or consumer sends close request
* to confirm successful end of consumption and to close the communication channel.
*
* @deprecated The default value should be reasonable enough in all cases, this option is to fallback to older behaviour
* which will be removed or refactored in future.
*/
@Deprecated
public static final ConfigOption<Boolean> TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED = ConfigOptions
.key("resourcemanager.taskmanager-release.wait.result.consumed")
.defaultValue(true)
.withDescription(Description.builder()
.text("Release task executor only when each produced result partition is either consumed or failed. " +
"'True' is default. 'False' means that idle task executor release is not blocked " +
"by receiver confirming consumption of result partition " +
"and can happen right away after 'resourcemanager.taskmanager-timeout' has elapsed. " +
"Setting this option to 'false' can speed up task executor release but can lead to unexpected failures " +
"if end of consumption is slower than 'resourcemanager.taskmanager-timeout'.")
.build());

/**
* Prefix for passing custom environment variables to Flink's master process.
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public void close() throws IOException {
reader.releaseAllResources();
markAsReleased(reader.getReceiverId());
}
allReaders.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,15 @@ void onConsumedPartition(ResultPartition partition) {
LOG.debug("Released {}.", partition);
}
}

public boolean areAllPartitionsReleased() {
synchronized (registeredPartitions) {
for (ResultPartition partition : registeredPartitions.values()) {
if (!partition.isReleased()) {
return false;
}
}
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public static ResourceManagerRuntimeServices fromConfiguration(
scheduledExecutor,
slotManagerConfiguration.getTaskManagerRequestTimeout(),
slotManagerConfiguration.getSlotRequestTimeout(),
slotManagerConfiguration.getTaskManagerTimeout());
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.isWaitResultConsumedBeforeRelease());

final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,21 @@ public class SlotManager implements AutoCloseable {
/** True iff the component has been started. */
private boolean started;

/** Release task executor only when each produced result partition is either consumed or failed. */
private final boolean waitResultConsumedBeforeRelease;

public SlotManager(
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout) {
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease) {

this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;

slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
Expand Down Expand Up @@ -999,30 +1005,46 @@ private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
// Internal timeout methods
// ---------------------------------------------------------------------------------------------

private void checkTaskManagerTimeouts() {
@VisibleForTesting
void checkTaskManagerTimeouts() {
if (!taskManagerRegistrations.isEmpty()) {
long currentTime = System.currentTimeMillis();

ArrayList<InstanceID> timedOutTaskManagerIds = new ArrayList<>(taskManagerRegistrations.size());
ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());

// first retrieve the timed out TaskManagers
for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) {
if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
// we collect the instance ids first in order to avoid concurrent modifications by the
// ResourceActions.releaseResource call
timedOutTaskManagerIds.add(taskManagerRegistration.getInstanceId());
timedOutTaskManagers.add(taskManagerRegistration);
}
}

// second we trigger the release resource callback which can decide upon the resource release
final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) {
LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
resourceActions.releaseResource(timedOutTaskManagerId, cause);
for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
if (waitResultConsumedBeforeRelease) {
// checking whether TaskManagers can be safely removed
taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
.thenAcceptAsync(canBeReleased -> {
if (canBeReleased) {
releaseTaskExecutor(timedOutTaskManagerId);
}},
mainThreadExecutor);
} else {
releaseTaskExecutor(timedOutTaskManagerId);
}
}
}
}

private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
resourceActions.releaseResource(timedOutTaskManagerId, cause);
}

private void checkSlotRequestTimeouts() {
if (!pendingSlotRequests.isEmpty()) {
long currentTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ public class SlotManagerConfiguration {
private final Time taskManagerRequestTimeout;
private final Time slotRequestTimeout;
private final Time taskManagerTimeout;
private final boolean waitResultConsumedBeforeRelease;

public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout) {
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease) {

this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
}

public Time getTaskManagerRequestTimeout() {
Expand All @@ -63,6 +67,10 @@ public Time getTaskManagerTimeout() {
return taskManagerTimeout;
}

public boolean isWaitResultConsumedBeforeRelease() {
return waitResultConsumedBeforeRelease;
}

public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time rpcTimeout;
Expand All @@ -78,7 +86,10 @@ public static SlotManagerConfiguration fromConfiguration(Configuration configura
final Time taskManagerTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
boolean waitResultConsumedBeforeRelease =
configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);

return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
}

private static Time getSlotRequestTimeout(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ public TaskExecutor(
this.stackTraceSampleService = new StackTraceSampleService(rpcService.getScheduledExecutor());
}

@Override
public CompletableFuture<Boolean> canBeReleased() {
return CompletableFuture.completedFuture(
taskExecutorServices.getNetworkEnvironment().getResultPartitionManager().areAllPartitionsReleased());
}

// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,11 @@ CompletableFuture<Acknowledge> freeSlot(
* @return Future gateway of Metric Query Service on the TaskManager.
*/
CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(@RpcTimeout Time timeout);

/**
* Checks whether the task executor can be released. It cannot be released if there're unconsumed result partitions.
*
* @return Future flag indicating whether the task executor can be released.
*/
CompletableFuture<Boolean> canBeReleased();
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public boolean markReleasing() {
*/
public SlotOffer generateSlotOffer() {
Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,
"The task slot is not in state active or allocated.");
"The task slot is not in state active or allocated.");
Preconditions.checkState(allocationId != null, "The task slot are not allocated");

return new SlotOffer(allocationId, index, resourceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ private final class AllocationIDIterator implements Iterator<AllocationID> {
private final Iterator<TaskSlot> iterator;

private AllocationIDIterator(JobID jobId, TaskSlotState state) {
iterator = new TaskSlotIterator(jobId, state);
iterator = new TaskSlotIterator(jobId, state);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public void execute(@Nonnull Runnable command) {
}
}

/** Triggers all {@code queuedRunnables}. */
public void triggerAll() {
while (numQueuedRunnables() > 0) {
trigger();
}
}

/**
* Triggers the next queued runnable and executes it synchronously.
* This method throws an exception if no Runnable is currently queued.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void confirmLeaderSessionID(UUID leaderId) {
new SlotManagerConfiguration(
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime()));
TestingUtils.infiniteTime(),
true));
ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -137,11 +137,9 @@ private ResourceManager<?> createAndStartResourceManager() throws Exception {
rpcService.getScheduledExecutor(),
Time.minutes(5L));

final SlotManager slotManager = new SlotManager(
rpcService.getScheduledExecutor(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime());
final SlotManager slotManager = SlotManagerBuilder.newBuilder()
.setScheduledExecutor(rpcService.getScheduledExecutor())
.build();

ResourceManager<?> resourceManager = new StandaloneResourceManager(
rpcService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
Expand All @@ -44,7 +45,6 @@
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -136,11 +136,9 @@ private StandaloneResourceManager createAndStartResourceManager(LeaderElectionSe
HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);

SlotManager slotManager = new SlotManager(
rpcService.getScheduledExecutor(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime());
SlotManager slotManager = SlotManagerBuilder.newBuilder()
.setScheduledExecutor(rpcService.getScheduledExecutor())
.build();

JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
Expand Down Expand Up @@ -235,11 +236,9 @@ private void runHeartbeatTimeoutTest(
}

private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices) throws Exception {
final SlotManager slotManager = new SlotManager(
rpcService.getScheduledExecutor(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime());
final SlotManager slotManager = SlotManagerBuilder.newBuilder()
.setScheduledExecutor(rpcService.getScheduledExecutor())
.build();
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
rpcService.getScheduledExecutor(),
Expand Down
Loading

0 comments on commit 264a0c9

Please sign in to comment.