Skip to content

Commit

Permalink
[FLINK-24713][Runtime/Coordination] Postpone resourceManager serving …
Browse files Browse the repository at this point in the history
…after the recovery phase has finished

This closes apache#20256
  • Loading branch information
Aitozi authored and xintongsong committed Jul 25, 2022
1 parent 4bbf319 commit 2b9b985
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<td>String</td>
<td>Timeout for jobs which don't have a job manager as leader assigned.</td>
</tr>
<tr>
<td><h5>resourcemanager.previous-worker.recovery.timeout</h5></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>Timeout for resource manager to recover all the previous attempts workers. If exceeded, resource manager will handle new resource requests by requesting new workers. If you would like to reuse the previous workers as much as possible, you should configure a longer timeout time to wait for previous workers to register.</td>
</tr>
<tr>
<td><h5>resourcemanager.rpc.port</h5></td>
<td style="word-wrap: break-word;">0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ public class ResourceManagerOptions {
+ TaskManagerOptions.REGISTRATION_TIMEOUT.key()
+ "'.");

/** Timeout for ResourceManager to recover all the previous attempts workers. */
public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(0))
.withDescription(
"Timeout for resource manager to recover all the previous attempts workers. If exceeded,"
+ " resource manager will handle new resource requests by requesting new workers."
+ " If you would like to reuse the previous workers as much as possible, you should"
+ " configure a longer timeout time to wait for previous workers to register.");

// ---------------------------------------------------------------------------------------------

/** Not intended to be instantiated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,13 @@ public CompletableFuture<Acknowledge> declareRequiredResources(

if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
slotManager.processResourceRequirements(resourceRequirements);

return CompletableFuture.completedFuture(Acknowledge.get());
return getReadyToServeFuture()
.thenApply(
acknowledge -> {
validateRunsInMainThread();
slotManager.processResourceRequirements(resourceRequirements);
return null;
});
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
Expand Down Expand Up @@ -1252,6 +1256,14 @@ protected abstract void internalDeregisterApplication(
*/
public abstract boolean stopWorker(WorkerType worker);

/**
* Get the ready to serve future of the resource manager.
*
* @return The ready to serve future of the resource manager, which indicated whether it is
* ready to serve.
*/
protected abstract CompletableFuture<Void> getReadyToServeFuture();

/**
* Set {@link SlotManager} whether to fail unfulfillable slot requests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.annotation.Nullable;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -126,4 +127,9 @@ private void startStartupPeriod() {
TimeUnit.MILLISECONDS);
}
}

@Override
public CompletableFuture<Void> getReadyToServeFuture() {
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
*/
private CompletableFuture<Void> startWorkerCoolDown;

/** The future indicates whether the rm is ready to serve. */
private final CompletableFuture<Void> readyToServeFuture;

/** Timeout to wait for all the previous attempts workers to recover. */
private final Duration previousWorkerRecoverTimeout;

public ActiveResourceManager(
ResourceManagerDriver<WorkerType> resourceManagerDriver,
Configuration flinkConfig,
Expand All @@ -121,6 +127,7 @@ public ActiveResourceManager(
ThresholdMeter startWorkerFailureRater,
Duration retryInterval,
Duration workerRegistrationTimeout,
Duration previousWorkerRecoverTimeout,
Executor ioExecutor) {
super(
rpcService,
Expand Down Expand Up @@ -150,6 +157,8 @@ public ActiveResourceManager(
this.startWorkerRetryInterval = retryInterval;
this.workerRegistrationTimeout = workerRegistrationTimeout;
this.startWorkerCoolDown = FutureUtils.completedVoidFuture();
this.previousWorkerRecoverTimeout = previousWorkerRecoverTimeout;
this.readyToServeFuture = new CompletableFuture<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -213,7 +222,7 @@ protected void onWorkerRegistered(WorkerType worker) {

final WorkerResourceSpec workerResourceSpec =
currentAttemptUnregisteredWorkers.remove(resourceId);
previousAttemptUnregisteredWorkers.remove(resourceId);
tryRemovePreviousPendingRecoveryTaskManager(resourceId);
if (workerResourceSpec != null) {
final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
log.info(
Expand Down Expand Up @@ -251,6 +260,18 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
"Worker {} recovered from previous attempt.",
resourceId.getStringWithMetadata());
}
if (recoveredWorkers.size() > 0 && !previousWorkerRecoverTimeout.isZero()) {
scheduleRunAsync(
() -> {
readyToServeFuture.complete(null);
log.info(
"Timeout to wait recovery taskmanagers, recovery future is completed.");
},
previousWorkerRecoverTimeout.toMillis(),
TimeUnit.MILLISECONDS);
} else {
readyToServeFuture.complete(null);
}
}

@Override
Expand Down Expand Up @@ -367,7 +388,7 @@ private boolean clearStateForWorker(ResourceID resourceId) {

WorkerResourceSpec workerResourceSpec =
currentAttemptUnregisteredWorkers.remove(resourceId);
previousAttemptUnregisteredWorkers.remove(resourceId);
tryRemovePreviousPendingRecoveryTaskManager(resourceId);
if (workerResourceSpec != null) {
final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
log.info(
Expand Down Expand Up @@ -425,6 +446,27 @@ private void tryResetWorkerCreationCoolDown() {
}
}

@Override
public CompletableFuture<Void> getReadyToServeFuture() {
return readyToServeFuture;
}

private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
if (previousAttemptUnregisteredWorkers.remove(resourceID)) {
log.info(
"Pending recovery taskmanagers {} -> {}.{}",
sizeBeforeRemove,
previousAttemptUnregisteredWorkers.size(),
previousAttemptUnregisteredWorkers.size() == 0
? " Resource manager is ready to serve."
: "");
}
if (previousAttemptUnregisteredWorkers.size() == 0) {
readyToServeFuture.complete(null);
}
}

/** Always execute on the current main thread executor. */
private class GatewayMainThreadExecutor implements ScheduledExecutor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public ResourceManager<WorkerType> createResourceManager(
configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
final Duration workerRegistrationTimeout =
configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
final Duration previousWorkerRecoverTimeout =
configuration.get(
ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);

return new ActiveResourceManager<>(
createResourceManagerDriver(
configuration, webInterfaceUrl, rpcService.getAddress()),
Expand All @@ -128,6 +132,7 @@ public ResourceManager<WorkerType> createResourceManager(
failureRater,
retryInterval,
workerRegistrationTimeout,
previousWorkerRecoverTimeout,
ioExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,69 @@ void testDisconnectJobManagerClearsRequirements() throws Exception {
assertThat(clearRequirementsFuture.get(5, TimeUnit.SECONDS)).isEqualTo(jobId);
}

@Test
void testProcessResourceRequirementsWhenRecoveryFinished() throws Exception {
final TestingJobMasterGateway jobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setAddress(UUID.randomUUID().toString())
.build();
rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

final JobLeaderIdService jobLeaderIdService =
TestingJobLeaderIdService.newBuilder()
.setGetLeaderIdFunction(
jobId ->
CompletableFuture.completedFuture(
jobMasterGateway.getFencingToken()))
.build();

final CompletableFuture<Void> processRequirementsFuture = new CompletableFuture<>();
final CompletableFuture<Void> readyToServeFuture = new CompletableFuture<>();

final SlotManager slotManager =
new TestingSlotManagerBuilder()
.setProcessRequirementsConsumer(
r -> processRequirementsFuture.complete(null))
.createSlotManager();
resourceManager =
new ResourceManagerBuilder()
.withJobLeaderIdService(jobLeaderIdService)
.withSlotManager(slotManager)
.withReadyToServeFuture(readyToServeFuture)
.buildAndStart();

final JobID jobId = JobID.generate();
final ResourceManagerGateway resourceManagerGateway =
resourceManager.getSelfGateway(ResourceManagerGateway.class);
resourceManagerGateway
.registerJobMaster(
jobMasterGateway.getFencingToken(),
ResourceID.generate(),
jobMasterGateway.getAddress(),
jobId,
TIMEOUT)
.get();

resourceManagerGateway.declareRequiredResources(
jobMasterGateway.getFencingToken(),
ResourceRequirements.create(
jobId,
jobMasterGateway.getAddress(),
Collections.singleton(
ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))),
TIMEOUT);
resourceManager
.runInMainThread(
() -> {
assertThat(processRequirementsFuture.isDone()).isFalse();
readyToServeFuture.complete(null);
assertThat(processRequirementsFuture.isDone()).isTrue();
return null;
},
TIMEOUT)
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}

@Test
void testHeartbeatTimeoutWithJobMaster() throws Exception {
final CompletableFuture<ResourceID> heartbeatRequestFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -427,12 +490,11 @@ void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
stopWorkerFuture.complete(worker);
return true;
}),
resourceManagerGateway -> {
registerTaskExecutor(
resourceManagerGateway,
taskExecutorId,
taskExecutorGateway.getAddress());
},
resourceManagerGateway ->
registerTaskExecutor(
resourceManagerGateway,
taskExecutorId,
taskExecutorGateway.getAddress()),
resourceManagerResourceId -> {
// might have been completed or not depending whether the timeout was triggered
// first
Expand Down Expand Up @@ -727,6 +789,8 @@ private class ResourceManagerBuilder {
private BlocklistHandler.Factory blocklistHandlerFactory =
new NoOpBlocklistHandler.Factory();
private Function<ResourceID, Boolean> stopWorkerFunction = null;
private CompletableFuture<Void> readyToServeFuture =
CompletableFuture.completedFuture(null);

private ResourceManagerBuilder withHeartbeatServices(HeartbeatServices heartbeatServices) {
this.heartbeatServices = heartbeatServices;
Expand Down Expand Up @@ -756,6 +820,12 @@ private ResourceManagerBuilder withStopWorkerFunction(
return this;
}

public ResourceManagerBuilder withReadyToServeFuture(
CompletableFuture<Void> readyToServeFuture) {
this.readyToServeFuture = readyToServeFuture;
return this;
}

private TestingResourceManager buildAndStart() throws Exception {
if (heartbeatServices == null) {
heartbeatServices = ResourceManagerTest.heartbeatServices;
Expand Down Expand Up @@ -793,7 +863,8 @@ private TestingResourceManager buildAndStart() throws Exception {
jobLeaderIdService,
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
stopWorkerFunction);
stopWorkerFunction,
readyToServeFuture);

resourceManager.start();
resourceManager.getStartedFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand All @@ -31,17 +32,21 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.TimeUtils;

import javax.annotation.Nullable;

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;

/** Simple {@link ResourceManager} implementation for testing purposes. */
public class TestingResourceManager extends ResourceManager<ResourceID> {

private final Function<ResourceID, Boolean> stopWorkerFunction;
private final CompletableFuture<Void> readyToServeFuture;

public TestingResourceManager(
RpcService rpcService,
Expand All @@ -55,7 +60,8 @@ public TestingResourceManager(
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
Function<ResourceID, Boolean> stopWorkerFunction) {
Function<ResourceID, Boolean> stopWorkerFunction,
CompletableFuture<Void> readyToServeFuture) {
super(
rpcService,
leaderSessionId,
Expand All @@ -73,6 +79,7 @@ public TestingResourceManager(
ForkJoinPool.commonPool());

this.stopWorkerFunction = stopWorkerFunction;
this.readyToServeFuture = readyToServeFuture;
}

@Override
Expand Down Expand Up @@ -106,4 +113,13 @@ protected ResourceID workerStarted(ResourceID resourceID) {
public boolean stopWorker(ResourceID worker) {
return stopWorkerFunction.apply(worker);
}

@Override
public CompletableFuture<Void> getReadyToServeFuture() {
return readyToServeFuture;
}

public <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
return callAsync(callable, TimeUtils.toDuration(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,10 @@ public CompletableFuture<Void> getTerminationFuture() {
return getTerminationFutureFunction.apply(
MockResourceManager.this, super.getTerminationFuture());
}

@Override
public CompletableFuture<Void> getReadyToServeFuture() {
return CompletableFuture.completedFuture(null);
}
}
}
Loading

0 comments on commit 2b9b985

Please sign in to comment.