Skip to content

Commit

Permalink
[FLINK-22180][coordination] Only reclaim slots if job is removed
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 15, 2021
1 parent 5bbaa9b commit 9fd6ecf
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,7 @@ protected void closeJobManagerConnection(
jmResourceIdRegistrations.remove(jobManagerResourceId);

if (resourceRequirementHandling == ResourceRequirementHandling.CLEAR) {
slotManager.processResourceRequirements(
ResourceRequirements.empty(jobId, jobMasterGateway.getAddress()));
slotManager.clearResourceRequirements(jobId);
}

// tell the job manager about the disconnect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ public void close() throws Exception {
// Public API
// ---------------------------------------------------------------------------------------------

@Override
public void clearResourceRequirements(JobID jobId) {
checkInit();
maybeReclaimInactiveSlots(jobId);
jobMasterTargetAddresses.remove(jobId);
resourceTracker.notifyResourceRequirements(jobId, Collections.emptyList());
}

@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
checkInit();
Expand All @@ -257,11 +265,7 @@ public void processResourceRequirements(ResourceRequirements resourceRequirement
resourceRequirements.getResourceRequirements());
}

if (resourceRequirements.getResourceRequirements().isEmpty()) {
jobMasterTargetAddresses.remove(resourceRequirements.getJobId());

maybeReclaimInactiveSlots(resourceRequirements.getJobId());
} else {
if (!resourceRequirements.getResourceRequirements().isEmpty()) {
jobMasterTargetAddresses.put(
resourceRequirements.getJobId(), resourceRequirements.getTargetAddress());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ public void close() throws Exception {
// Public API
// ---------------------------------------------------------------------------------------------

@Override
public void clearResourceRequirements(JobID jobId) {
jobMasterTargetAddresses.remove(jobId);
resourceTracker.notifyResourceRequirements(jobId, Collections.emptyList());
}

@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
Expand Down Expand Up @@ -91,6 +92,14 @@ void start(
/** Suspends the component. This clears the internal state of the slot manager. */
void suspend();

/**
* Notifies the slot manager that the resource requirements for the given job should be cleared.
* The slot manager may assume that no further updates to the resource requirements will occur.
*
* @param jobId job for which to clear the requirements
*/
void clearResourceRequirements(JobID jobId);

/**
* Notifies the slot manager about the resource requirements of a job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ public void close() throws Exception {
// Public API
// ---------------------------------------------------------------------------------------------

@Override
public void clearResourceRequirements(JobID jobId) {}

@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
// no-op; don't throw an UnsupportedOperationException here because there are code paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
Expand All @@ -56,6 +59,7 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -227,6 +231,60 @@ private void registerTaskExecutor(
assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
}

@Test
public void testDisconnectJobManagerClearsRequirements() throws Exception {
final TestingJobMasterGateway jobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setAddress(UUID.randomUUID().toString())
.build();
rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

final JobLeaderIdService jobLeaderIdService =
TestingJobLeaderIdService.newBuilder()
.setGetLeaderIdFunction(
jobId ->
CompletableFuture.completedFuture(
jobMasterGateway.getFencingToken()))
.build();

final CompletableFuture<JobID> clearRequirementsFuture = new CompletableFuture<>();

final SlotManager slotManager =
new TestingSlotManagerBuilder()
.setClearRequirementsConsumer(clearRequirementsFuture::complete)
.createSlotManager();
resourceManager =
createAndStartResourceManager(heartbeatServices, jobLeaderIdService, slotManager);

final JobID jobId = JobID.generate();
final ResourceManagerGateway resourceManagerGateway =
resourceManager.getSelfGateway(ResourceManagerGateway.class);
resourceManagerGateway
.registerJobManager(
jobMasterGateway.getFencingToken(),
ResourceID.generate(),
jobMasterGateway.getAddress(),
jobId,
TIMEOUT)
.get();

resourceManagerGateway
.declareRequiredResources(
jobMasterGateway.getFencingToken(),
ResourceRequirements.create(
jobId,
jobMasterGateway.getAddress(),
Collections.singleton(
ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))),
TIMEOUT)
.get();

resourceManagerGateway.disconnectJobManager(
jobId, JobStatus.FINISHED, new FlinkException("Test exception"));

assertThat(clearRequirementsFuture.get(5, TimeUnit.SECONDS), is(jobId));
}

@Test
public void testHeartbeatTimeoutWithJobMaster() throws Exception {
final CompletableFuture<ResourceID> heartbeatRequestFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -396,6 +454,15 @@ private TestingResourceManager createAndStartResourceManager(
.setScheduledExecutor(rpcService.getScheduledExecutor())
.build();

return createAndStartResourceManager(heartbeatServices, jobLeaderIdService, slotManager);
}

private TestingResourceManager createAndStartResourceManager(
HeartbeatServices heartbeatServices,
JobLeaderIdService jobLeaderIdService,
SlotManager slotManager)
throws Exception {

final TestingResourceManager resourceManager =
new TestingResourceManager(
rpcService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1375,8 +1375,7 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw
}

@Test
public void testReclaimInactiveSlotsOnEmptyRequirements() throws Exception {

public void testReclaimInactiveSlotsOnClearRequirements() throws Exception {
final CompletableFuture<JobID> freeInactiveSlotsJobIdFuture = new CompletableFuture<>();

final TestingTaskExecutorGateway taskExecutorGateway =
Expand Down Expand Up @@ -1405,16 +1404,52 @@ public void testReclaimInactiveSlotsOnEmptyRequirements() throws Exception {
slotManager.processResourceRequirements(createResourceRequirements(jobId, 2));
assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));

// decrease requirements, which should not trigger slots being reclaimed
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
// set requirements to 0, which should not trigger slots being reclaimed
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId, "foobar"));
assertThat(freeInactiveSlotsJobIdFuture.isDone(), is(false));

// clear requirements, which should trigger slots being reclaimed
slotManager.processResourceRequirements(ResourceRequirements.empty(jobId, "foobar"));
slotManager.clearResourceRequirements(jobId);
assertThat(freeInactiveSlotsJobIdFuture.get(), is(jobId));
}
}

@Test
public void testClearRequirementsClearsResourceTracker() throws Exception {
final ResourceTracker resourceTracker = new DefaultResourceTracker();

final CompletableFuture<JobID> freeInactiveSlotsJobIdFuture = new CompletableFuture<>();

final TestingTaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
.setFreeInactiveSlotsConsumer(freeInactiveSlotsJobIdFuture::complete)
.createTestingTaskExecutorGateway();

try (final DeclarativeSlotManager slotManager =
createDeclarativeSlotManagerBuilder()
.setResourceTracker(resourceTracker)
.buildAndStart(
ResourceManagerId.generate(),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
new TestingResourceActionsBuilder().build())) {

final JobID jobId = new JobID();

final TaskExecutorConnection taskExecutionConnection =
createTaskExecutorConnection(taskExecutorGateway);
final SlotReport slotReport =
createSlotReportWithAllocatedSlots(
taskExecutionConnection.getResourceID(), jobId, 1);
slotManager.registerTaskManager(
taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);

slotManager.processResourceRequirements(createResourceRequirements(jobId, 2));
slotManager.clearResourceRequirements(jobId);

assertThat(resourceTracker.getMissingResources().keySet(), empty());
}
}

private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
for (int i = 0; i < numberSlots; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
Expand All @@ -42,12 +43,18 @@ public class TestingSlotManager implements SlotManager {

private final Consumer<Boolean> setFailUnfulfillableRequestConsumer;
private final Supplier<Map<WorkerResourceSpec, Integer>> getRequiredResourcesSupplier;
private final Consumer<ResourceRequirements> processRequirementsConsumer;
private final Consumer<JobID> clearRequirementsConsumer;

TestingSlotManager(
Consumer<Boolean> setFailUnfulfillableRequestConsumer,
Supplier<Map<WorkerResourceSpec, Integer>> getRequiredResourcesSupplier) {
Supplier<Map<WorkerResourceSpec, Integer>> getRequiredResourcesSupplier,
Consumer<ResourceRequirements> processRequirementsConsumer,
Consumer<JobID> clearRequirementsConsumer) {
this.setFailUnfulfillableRequestConsumer = setFailUnfulfillableRequestConsumer;
this.getRequiredResourcesSupplier = getRequiredResourcesSupplier;
this.processRequirementsConsumer = processRequirementsConsumer;
this.clearRequirementsConsumer = clearRequirementsConsumer;
}

@Override
Expand Down Expand Up @@ -115,7 +122,14 @@ public void start(
public void suspend() {}

@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {}
public void clearResourceRequirements(JobID jobId) {
clearRequirementsConsumer.accept(jobId);
}

@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
processRequirementsConsumer.accept(resourceRequirements);
}

@Override
public boolean registerSlotRequest(SlotRequest slotRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.slots.ResourceRequirements;

import java.util.Collections;
import java.util.Map;
Expand All @@ -31,6 +33,8 @@ public class TestingSlotManagerBuilder {
private Consumer<Boolean> setFailUnfulfillableRequestConsumer = ignored -> {};
private Supplier<Map<WorkerResourceSpec, Integer>> getRequiredResourcesSupplier =
() -> Collections.emptyMap();
private Consumer<ResourceRequirements> processRequirementsConsumer = ignored -> {};
private Consumer<JobID> clearRequirementsConsumer = ignored -> {};

public TestingSlotManagerBuilder setSetFailUnfulfillableRequestConsumer(
Consumer<Boolean> setFailUnfulfillableRequestConsumer) {
Expand All @@ -44,8 +48,23 @@ public TestingSlotManagerBuilder setGetRequiredResourcesSupplier(
return this;
}

public TestingSlotManagerBuilder setProcessRequirementsConsumer(
Consumer<ResourceRequirements> processRequirementsConsumer) {
this.processRequirementsConsumer = processRequirementsConsumer;
return this;
}

public TestingSlotManagerBuilder setClearRequirementsConsumer(
Consumer<JobID> clearRequirementsConsumer) {
this.clearRequirementsConsumer = clearRequirementsConsumer;
return this;
}

public TestingSlotManager createSlotManager() {
return new TestingSlotManager(
setFailUnfulfillableRequestConsumer, getRequiredResourcesSupplier);
setFailUnfulfillableRequestConsumer,
getRequiredResourcesSupplier,
processRequirementsConsumer,
clearRequirementsConsumer);
}
}

0 comments on commit 9fd6ecf

Please sign in to comment.