Skip to content

Commit

Permalink
[FLINK-12736][coordination] Release TaskExecutor in SlotManager only …
Browse files Browse the repository at this point in the history
…if there were no slot allocations after the partition check

The ResourceManager looks out for TaskManagers that have not had any slots allocated on them for a while, as these could be released to safe resources.
If such a TM is found, the RM checks via an RPC call whether the TM still holds any partitions. If no partition is held then the TM is released.
However, in the RPC callback no check is made whether the TM is actually still idle. In the meantime a slot could have been allocated on the TM.
Even if the slot has been freed, there can be newly allocated partitions not included in check result.

To make sure there was no resource allocation in between, we can mark the taskManagerRegistration.getIdleSince() time before starting the async 'no partition' check.
The TM can be released only if the idle time after the check matches the previously marked one. Otherwise we discard the release and start over after the next timeout.

This closes apache#8988.
  • Loading branch information
azagrebin authored and tillrohrmann committed Jul 9, 2019
1 parent c773ce5 commit 5c762ce
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1023,22 +1023,32 @@ void checkTaskManagerTimeouts() {

// second we trigger the release resource callback which can decide upon the resource release
for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
if (waitResultConsumedBeforeRelease) {
// checking whether TaskManagers can be safely removed
taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
.thenAcceptAsync(canBeReleased -> {
if (canBeReleased) {
releaseTaskExecutor(timedOutTaskManagerId);
}},
mainThreadExecutor);
releaseTaskExecutorIfPossible(taskManagerRegistration);
} else {
releaseTaskExecutor(timedOutTaskManagerId);
releaseTaskExecutor(taskManagerRegistration.getInstanceId());
}
}
}
}

private void releaseTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
long idleSince = taskManagerRegistration.getIdleSince();
taskManagerRegistration
.getTaskManagerConnection()
.getTaskExecutorGateway()
.canBeReleased()
.thenAcceptAsync(
canBeReleased -> {
InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
boolean stillIdle = idleSince == taskManagerRegistration.getIdleSince();
if (stillIdle && canBeReleased) {
releaseTaskExecutor(timedOutTaskManagerId);
}
},
mainThreadExecutor);
}

private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -719,7 +718,7 @@ public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();

final AtomicBoolean canBeReleased = new AtomicBoolean(false);
final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setCanBeReleasedSupplier(canBeReleased::get)
.createTestingTaskExecutorGateway();
Expand All @@ -742,14 +741,31 @@ public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));

// now it can not be released yet
canBeReleased.set(false);
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
canBeReleased.set(new CompletableFuture<>());
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
mainThreadExecutor.triggerAll();
assertFalse(releaseFuture.isDone());
canBeReleased.get().complete(false);
mainThreadExecutor.triggerAll();
assertThat(releaseFuture.isDone(), is(false));

// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
canBeReleased.set(new CompletableFuture<>());
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
mainThreadExecutor.triggerAll();
AllocationID allocationID = new AllocationID();
slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
mainThreadExecutor.triggerAll();
slotManager.freeSlot(slotId, allocationID);
canBeReleased.get().complete(true);
mainThreadExecutor.triggerAll();
assertThat(releaseFuture.isDone(), is(false));

// now it can and should be released
canBeReleased.set(true);
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
canBeReleased.set(new CompletableFuture<>());
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
mainThreadExecutor.triggerAll();
canBeReleased.get().complete(true);
mainThreadExecutor.triggerAll();
assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {

private final Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction;

private final Supplier<Boolean> canBeReleasedSupplier;
private final Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier;

private final BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer;

Expand All @@ -87,7 +87,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
Consumer<ResourceID> heartbeatResourceManagerConsumer,
Consumer<Exception> disconnectResourceManagerConsumer,
Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction,
Supplier<Boolean> canBeReleasedSupplier,
Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier,
BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
Expand Down Expand Up @@ -186,7 +186,7 @@ public CompletableFuture<SerializableOptional<String>> requestMetricQueryService

@Override
public CompletableFuture<Boolean> canBeReleased() {
return CompletableFuture.completedFuture(canBeReleasedSupplier.get());
return canBeReleasedSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TestingTaskExecutorGatewayBuilder {
private Consumer<ResourceID> heartbeatResourceManagerConsumer = NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER;
private Consumer<Exception> disconnectResourceManagerConsumer = NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER;
private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
private Supplier<Boolean> canBeReleasedSupplier = () -> true;
private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () -> CompletableFuture.completedFuture(true);
private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER;

public TestingTaskExecutorGatewayBuilder setAddress(String address) {
Expand Down Expand Up @@ -117,7 +117,7 @@ public TestingTaskExecutorGatewayBuilder setCancelTaskFunction(Function<Executio
return this;
}

public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<Boolean> canBeReleasedSupplier) {
public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier) {
this.canBeReleasedSupplier = canBeReleasedSupplier;
return this;
}
Expand Down

0 comments on commit 5c762ce

Please sign in to comment.