Skip to content

Commit

Permalink
[FLINK-14189][runtime] Extend TaskExecutor to support dynamic slot al…
Browse files Browse the repository at this point in the history
…location
  • Loading branch information
xintongsong authored and azagrebin committed Dec 8, 2019
1 parent da071ce commit 516aee3
Show file tree
Hide file tree
Showing 18 changed files with 600 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static String assembleDynamicConfigsStr(final Map<String, String> config
}

// ------------------------------------------------------------------------
// Generating Default Slot Resource Profiles
// Generating Slot Resource Profiles
// ------------------------------------------------------------------------

public static List<ResourceProfile> createDefaultWorkerSlotProfiles(
Expand All @@ -111,6 +111,16 @@ public static ResourceProfile generateDefaultSlotResourceProfile(
.build();
}

public static ResourceProfile generateTotalAvailableResourceProfile(TaskExecutorResourceSpec taskExecutorResourceSpec) {
return ResourceProfile.newBuilder()
.setCpuCores(taskExecutorResourceSpec.getCpuCores())
.setTaskHeapMemory(taskExecutorResourceSpec.getTaskHeapSize())
.setTaskOffHeapMemory(taskExecutorResourceSpec.getTaskOffHeapSize())
.setManagedMemory(taskExecutorResourceSpec.getManagedMemorySize())
.setShuffleMemory(taskExecutorResourceSpec.getShuffleMemSize())
.build();
}

// ------------------------------------------------------------------------
// Memory Configuration Calculations
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.clusterframework.types;

import org.apache.flink.util.Preconditions;

/**
* Budget manager for {@link ResourceProfile}.
*
* <p>For a given total resource budget, this class handles reserving and releasing resources
* from the budget, and rejects reservations if they cannot be satisfied by the remaining budget.
*
* <p>Both the total budget and the reservations are in the form of {@link ResourceProfile}.
*/
public class ResourceBudgetManager {

private final ResourceProfile totalBudget;

private ResourceProfile availableBudget;

public ResourceBudgetManager(final ResourceProfile totalBudget) {
checkResourceProfileNotNullOrUnknown(totalBudget);
this.totalBudget = totalBudget;
this.availableBudget = totalBudget;
}

public ResourceProfile getTotalBudget() {
return totalBudget;
}

public ResourceProfile getAvailableBudget() {
return availableBudget;
}

public boolean reserve(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
if (!availableBudget.isMatching(reservation)) {
return false;
}

availableBudget = availableBudget.subtract(reservation);
return true;
}

public boolean release(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
ResourceProfile newAvailableBudget = availableBudget.merge(reservation);
if (!totalBudget.isMatching(newAvailableBudget)) {
return false;
}

availableBudget = newAvailableBudget;
return true;
}

private static void checkResourceProfileNotNullOrUnknown(final ResourceProfile resourceProfile) {
Preconditions.checkNotNull(resourceProfile);
Preconditions.checkArgument(!resourceProfile.equals(ResourceProfile.UNKNOWN));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public SlotID(ResourceID resourceId, int slotNumber) {
this.slotNumber = slotNumber;
}

private SlotID(ResourceID resourceID) {
this.resourceId = checkNotNull(resourceID, "ResourceID must not be null");
this.slotNumber = -1;
}

// ------------------------------------------------------------------------

@Override
Expand Down Expand Up @@ -84,4 +89,11 @@ public int hashCode() {
public String toString() {
return resourceId + "_" + slotNumber;
}

/**
* Generate a SlotID without actual slot index for dynamic slot allocation.
*/
public static SlotID generateDynamicSlotID(ResourceID resourceID) {
return new SlotID(resourceID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
Expand Down Expand Up @@ -818,6 +819,7 @@ public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
Expand All @@ -834,7 +836,7 @@ public CompletableFuture<Acknowledge> requestSlot(
}

if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -54,6 +55,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param slotId slot id for the request
* @param jobId for which to request a slot
* @param allocationId id for the request
* @param resourceProfile of requested slot, used only for dynamic slot allocation and will be ignored otherwise
* @param targetAddress to which to offer the requested slots
* @param resourceManagerId current leader id of the ResourceManager
* @param timeout for the operation
Expand All @@ -63,6 +65,7 @@ CompletableFuture<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId,
@RpcTimeout Time timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
Expand All @@ -33,7 +32,6 @@
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand All @@ -46,11 +44,8 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
Expand Down Expand Up @@ -288,21 +283,15 @@ private static TaskSlotTable createTaskSlotTable(
final TaskExecutorResourceSpec taskExecutorResourceSpec,
final long timerServiceShutdownTimeout,
final int pageSize) {
final List<ResourceProfile> resourceProfiles =
TaskExecutorResourceUtils.createDefaultWorkerSlotProfiles(taskExecutorResourceSpec, numberOfSlots);
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
timerServiceShutdownTimeout);
return new TaskSlotTable(createTaskSlotsFromResources(resourceProfiles, pageSize), timerService);
}

private static List<TaskSlot> createTaskSlotsFromResources(
List<ResourceProfile> resourceProfiles,
int memoryPageSize) {
return IntStream
.range(0, resourceProfiles.size())
.mapToObj(index -> new TaskSlot(index, resourceProfiles.get(index), memoryPageSize))
.collect(Collectors.toList());
return new TaskSlotTable(
numberOfSlots,
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
pageSize,
timerService);
}

private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,26 @@ public class TaskSlot implements AutoCloseable {
private TaskSlotState state;

/** Job id to which the slot has been allocated; null if not allocated. */
private JobID jobId;
private final JobID jobId;

/** Allocation id of this slot; null if not allocated. */
private AllocationID allocationId;

public TaskSlot(final int index, final ResourceProfile resourceProfile, final int memoryPageSize) {
private final AllocationID allocationId;

public TaskSlot(
final int index,
final ResourceProfile resourceProfile,
final int memoryPageSize,
final JobID jobId,
final AllocationID allocationId) {
Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
this.index = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);

this.tasks = new HashMap<>(4);
this.state = TaskSlotState.FREE;
this.state = TaskSlotState.ALLOCATED;

this.jobId = null;
this.allocationId = null;
this.jobId = jobId;
this.allocationId = allocationId;

this.memoryManager = createMemoryManager(resourceProfile, memoryPageSize);
}
Expand Down Expand Up @@ -119,10 +124,6 @@ public boolean isEmpty() {
return tasks.isEmpty();
}

public boolean isFree() {
return TaskSlotState.FREE == state;
}

public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) {
Preconditions.checkNotNull(activeJobId);
Preconditions.checkNotNull(activeAllocationId);
Expand Down Expand Up @@ -209,39 +210,6 @@ public void clear() {
tasks.clear();
}

/**
* Allocate the task slot for the given job and allocation id. If the slot could be allocated,
* or is already allocated/active for the given job and allocation id, then the method returns
* true. Otherwise it returns false.
*
* <p>A slot can only be allocated if it's current state is free.
*
* @param newJobId to allocate the slot for
* @param newAllocationId to identify the slot allocation
* @return True if the slot was allocated for the given job and allocation id; otherwise false
*/
public boolean allocate(JobID newJobId, AllocationID newAllocationId) {
if (TaskSlotState.FREE == state) {
// sanity checks
Preconditions.checkState(allocationId == null);
Preconditions.checkState(jobId == null);

this.jobId = Preconditions.checkNotNull(newJobId);
this.allocationId = Preconditions.checkNotNull(newAllocationId);

state = TaskSlotState.ALLOCATED;

return true;
} else if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
Preconditions.checkNotNull(newJobId);
Preconditions.checkNotNull(newAllocationId);

return newJobId.equals(jobId) && newAllocationId.equals(allocationId);
} else {
return false;
}
}

/**
* Mark this slot as active. A slot can only be marked active if it's in state allocated.
*
Expand Down Expand Up @@ -275,24 +243,6 @@ public boolean markInactive() {
}
}

/**
* Mark the slot as free. A slot can only be marked as free if it's empty.
*
* @return True if the new state is free; otherwise false
*/
public boolean markFree() {
if (isEmpty()) {
state = TaskSlotState.FREE;
verifyMemoryFreed();
this.jobId = null;
this.allocationId = null;

return true;
} else {
return false;
}
}

/**
* Mark this slot as releasing. A slot can always be marked as releasing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@
enum TaskSlotState {
ACTIVE, // Slot is in active use by a job manager responsible for a job
ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager
RELEASING, // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released
FREE // Slot is free
RELEASING // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released
}
Loading

0 comments on commit 516aee3

Please sign in to comment.