Skip to content

Commit

Permalink
[FLINK-4839] [cluster management] JobManager handle TaskManager's slo…
Browse files Browse the repository at this point in the history
…t offering

This closes apache#2647 apache#2643.
  • Loading branch information
KurtYoung authored and StephanEwen committed Dec 23, 2016
1 parent a7ed9a5 commit af924b4
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
}

// ------------------------------------------------------------------------
// Slot De-allocation
// Slot releasing & offering
// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -323,10 +323,6 @@ private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDesc
return null;
}

// ------------------------------------------------------------------------
// Slot Releasing
// ------------------------------------------------------------------------

/**
* Release slot to TaskManager, called for finished tasks or canceled jobs.
*
Expand All @@ -340,10 +336,6 @@ public void releaseSlot(final Slot slot) {
}
}

// ------------------------------------------------------------------------
// Slot Offering
// ------------------------------------------------------------------------

/**
* Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
Expand Down Expand Up @@ -400,6 +392,39 @@ public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor s
}
}

// ------------------------------------------------------------------------
// Error Handling
// ------------------------------------------------------------------------

/**
* Fail the specified allocation and release the corresponding slot if we have one.
* This may triggered by JobManager when some slot allocation failed with timeout.
* Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
* and decided to take it back.
*
* @param allocationID Represents the allocation which should be failed
* @param cause The cause of the failure
*/
public void failAllocation(final AllocationID allocationID, final Exception cause) {
synchronized (lock) {
// 1. check whether the allocation still pending
Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
pendingRequests.get(allocationID);
if (pendingRequest != null) {
pendingRequest.f1.completeExceptionally(cause);
return;
}

// 2. check whether we have a free slot corresponding to this allocation id
// TODO: add allocation id to slot descriptor, so we can remove it by allocation id

// 3. check whether we have a in-use slot corresponding to this allocation id
// TODO: needs mechanism to release the in-use Slot but don't return it back to this pool

// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
}
}

// ------------------------------------------------------------------------
// Resource
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -464,12 +489,13 @@ public void disconnectResourceManager() {
*/
static class AllocatedSlots {

/** All allocated slots organized by TaskManager */
/** All allocated slots organized by TaskManager's id */
private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;

/** All allocated slots organized by Slot object */
private final Map<Slot, AllocationID> allocatedSlots;

/** All allocated slot descriptors organized by Slot object */
private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;

/** All allocated slots organized by AllocationID */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotDescriptor;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
Expand All @@ -95,7 +97,9 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -663,13 +667,51 @@ public ClassloadingProps requestClassloadingProps() throws Exception {
}

@RpcMethod
public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> slots, UUID leaderId) {
throw new UnsupportedOperationException("Has to be implemented.");
public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
{
if (!this.leaderSessionID.equals(leaderId)) {
throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ ", actual: " + leaderId);
}

Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
if (taskManager == null) {
throw new Exception("Unknown TaskManager " + taskManagerId);
}

final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
for (SlotOffer slotOffer : slots) {
final SlotDescriptor slotDescriptor = new SlotDescriptor(
jobGraph.getJobID(),
taskManager.f0,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1)
if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) {
acceptedSlotOffers.add(slotOffer);
}
}

return acceptedSlotOffers;
}

@RpcMethod
public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) {
throw new UnsupportedOperationException("Has to be implemented.");
public void failSlot(final ResourceID taskManagerId,
final AllocationID allocationId,
final UUID leaderId,
final Exception cause) throws Exception
{
if (!this.leaderSessionID.equals(leaderId)) {
throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ ", actual: " + leaderId);
}

if (!registeredTaskManagers.containsKey(taskManagerId)) {
throw new Exception("Unknown TaskManager " + taskManagerId);
}

slotPool.failAllocation(allocationId, cause);
}

@RpcMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

Expand Down Expand Up @@ -166,21 +167,30 @@ void notifyKvStateUnregistered(
/**
* Offer the given slots to the job manager. The response contains the set of accepted slots.
*
* @param slots to offer to the job manager
* @param leaderId identifying the job leader
* @param timeout for the rpc call
* @param taskManagerId identifying the task manager
* @param slots to offer to the job manager
* @param leaderId identifying the job leader
* @param timeout for the rpc call
* @return Future set of accepted slots.
*/
Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> slots, UUID leaderId, @RpcTimeout final Time timeout);
Future<Iterable<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Iterable<SlotOffer> slots,
final UUID leaderId,
@RpcTimeout final Time timeout);

/**
* Fail the slot with the given allocation id and cause.
*
* @param allocationId identifying the slot to fail
* @param leaderId identifying the job leader
* @param cause of the failing
* @param taskManagerId identifying the task manager
* @param allocationId identifying the slot to fail
* @param leaderId identifying the job leader
* @param cause of the failing
*/
void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause);
void failSlot(final ResourceID taskManagerId,
final AllocationID allocationId,
final UUID leaderId,
final Exception cause);

/**
* Register the task manager at the job manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
* and bookkeeping.
*
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* It offers the following methods as part of its rpc interface to interact with him remotely:
* <ul>
* <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
Expand Down Expand Up @@ -660,47 +662,49 @@ private void offerSlotsToJobManager(final JobID jobId) {

final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();

final Iterator<AllocationID> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final UUID leaderId = jobManagerConnection.getLeaderId();

final Collection<AllocationID> reservedSlots = new HashSet<>(2);
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);

while (reservedSlotsIterator.hasNext()) {
reservedSlots.add(reservedSlotsIterator.next());
reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
}

Future<Iterable<AllocationID>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
leaderId,
taskManagerConfiguration.getTimeout());

acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<AllocationID>>() {
acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
@Override
public void accept(Iterable<AllocationID> acceptedSlots) {
public void accept(Iterable<SlotOffer> acceptedSlots) {
// check if the response is still valid
if (isJobManagerConnectionValid(jobId, leaderId)) {
// mark accepted slots active
for (AllocationID acceptedSlot: acceptedSlots) {
for (SlotOffer acceptedSlot: acceptedSlots) {
try {
if (!taskSlotTable.markSlotActive(acceptedSlot)) {
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
leaderId, new Exception(message));
}

// remove the assigned slots so that we can free the left overs
reservedSlots.remove(acceptedSlot);
} catch (SlotNotFoundException e) {
log.debug("Could not mark slot {} active.", acceptedSlot, e);
jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e);
}
}

final Exception e = new Exception("The slot was rejected by the JobManager.");

for (AllocationID rejectedSlot: reservedSlots) {
freeSlot(rejectedSlot, e);
for (SlotOffer rejectedSlot: reservedSlots) {
freeSlot(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
Expand All @@ -718,8 +722,8 @@ public Void apply(Throwable throwable) {
offerSlotsToJobManager(jobId);
} else {
// We encountered an exception. Free the slots and return them to the RM.
for (AllocationID reservedSlot: reservedSlots) {
freeSlot(reservedSlot, throwable);
for (SlotOffer reservedSlot: reservedSlots) {
freeSlot(reservedSlot.getAllocationId(), throwable);
}
}

Expand Down Expand Up @@ -870,7 +874,7 @@ public Void apply(Throwable value) {

private void unregisterTaskAndNotifyFinalState(
final UUID jobMasterLeaderId,
final JobMasterGateway jobMasterGateway,
final JobMasterGateway jobMasterGateway,
final ExecutionAttemptID executionAttemptID) {

Task task = taskSlotTable.removeTask(executionAttemptID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor.slot;

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

/**
* Describe the slot offering to job manager provided by task manager.
*/
public class SlotOffer implements Serializable {

private static final long serialVersionUID = -7067814231108250971L;

/** Allocation id of this slot, this would be the only identifier for this slot offer */
private AllocationID allocationId;

/** Index of the offered slot */
private final int slotIndex;

/** The resource profile of the offered slot */
private final ResourceProfile resourceProfile;

public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) {
Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
this.allocationId = Preconditions.checkNotNull(allocationID);
this.slotIndex = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
}

public AllocationID getAllocationId() {
return allocationId;
}

public int getSlotIndex() {
return slotIndex;
}

public ResourceProfile getResourceProfile() {
return resourceProfile;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SlotOffer slotOffer = (SlotOffer) o;
return allocationId.equals(slotOffer.allocationId);
}

@Override
public int hashCode() {
return allocationId.hashCode();
}
}
Loading

0 comments on commit af924b4

Please sign in to comment.