diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 7e7b21eaa3ec7..44df29b114612 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -267,7 +267,7 @@ public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) { } // ------------------------------------------------------------------------ - // Slot De-allocation + // Slot releasing & offering // ------------------------------------------------------------------------ /** @@ -323,10 +323,6 @@ private FlinkCompletableFuture pollPendingRequest(final SlotDesc return null; } - // ------------------------------------------------------------------------ - // Slot Releasing - // ------------------------------------------------------------------------ - /** * Release slot to TaskManager, called for finished tasks or canceled jobs. * @@ -340,10 +336,6 @@ public void releaseSlot(final Slot slot) { } } - // ------------------------------------------------------------------------ - // Slot Offering - // ------------------------------------------------------------------------ - /** * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation @@ -400,6 +392,39 @@ public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor s } } + // ------------------------------------------------------------------------ + // Error Handling + // ------------------------------------------------------------------------ + + /** + * Fail the specified allocation and release the corresponding slot if we have one. + * This may triggered by JobManager when some slot allocation failed with timeout. + * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot, + * and decided to take it back. + * + * @param allocationID Represents the allocation which should be failed + * @param cause The cause of the failure + */ + public void failAllocation(final AllocationID allocationID, final Exception cause) { + synchronized (lock) { + // 1. check whether the allocation still pending + Tuple2> pendingRequest = + pendingRequests.get(allocationID); + if (pendingRequest != null) { + pendingRequest.f1.completeExceptionally(cause); + return; + } + + // 2. check whether we have a free slot corresponding to this allocation id + // TODO: add allocation id to slot descriptor, so we can remove it by allocation id + + // 3. check whether we have a in-use slot corresponding to this allocation id + // TODO: needs mechanism to release the in-use Slot but don't return it back to this pool + + // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase + } + } + // ------------------------------------------------------------------------ // Resource // ------------------------------------------------------------------------ @@ -464,12 +489,13 @@ public void disconnectResourceManager() { */ static class AllocatedSlots { - /** All allocated slots organized by TaskManager */ + /** All allocated slots organized by TaskManager's id */ private final Map> allocatedSlotsByResource; /** All allocated slots organized by Slot object */ private final Map allocatedSlots; + /** All allocated slot descriptors organized by Slot object */ private final Map allocatedSlotsWithDescriptor; /** All allocated slots organized by AllocationID */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7bcfb3ac57c40..3c6bbd37cc54d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotDescriptor; import org.apache.flink.runtime.instance.SlotPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -85,6 +86,7 @@ import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; @@ -95,7 +97,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -663,13 +667,51 @@ public ClassloadingProps requestClassloadingProps() throws Exception { } @RpcMethod - public Iterable offerSlots(final Iterable slots, UUID leaderId) { - throw new UnsupportedOperationException("Has to be implemented."); + public Iterable offerSlots(final ResourceID taskManagerId, + final Iterable slots, final UUID leaderId) throws Exception + { + if (!this.leaderSessionID.equals(leaderId)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderId); + } + + Tuple2 taskManager = registeredTaskManagers.get(taskManagerId); + if (taskManager == null) { + throw new Exception("Unknown TaskManager " + taskManagerId); + } + + final Set acceptedSlotOffers = new HashSet<>(4); + for (SlotOffer slotOffer : slots) { + final SlotDescriptor slotDescriptor = new SlotDescriptor( + jobGraph.getJobID(), + taskManager.f0, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1) + if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) { + acceptedSlotOffers.add(slotOffer); + } + } + + return acceptedSlotOffers; } @RpcMethod - public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) { - throw new UnsupportedOperationException("Has to be implemented."); + public void failSlot(final ResourceID taskManagerId, + final AllocationID allocationId, + final UUID leaderId, + final Exception cause) throws Exception + { + if (!this.leaderSessionID.equals(leaderId)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderId); + } + + if (!registeredTaskManagers.containsKey(taskManagerId)) { + throw new Exception("Unknown TaskManager " + taskManagerId); + } + + slotPool.failAllocation(allocationId, cause); } @RpcMethod diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 8925d940b6268..2d7ebb986bf7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -166,21 +167,30 @@ void notifyKvStateUnregistered( /** * Offer the given slots to the job manager. The response contains the set of accepted slots. * - * @param slots to offer to the job manager - * @param leaderId identifying the job leader - * @param timeout for the rpc call + * @param taskManagerId identifying the task manager + * @param slots to offer to the job manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call * @return Future set of accepted slots. */ - Future> offerSlots(final Iterable slots, UUID leaderId, @RpcTimeout final Time timeout); + Future> offerSlots( + final ResourceID taskManagerId, + final Iterable slots, + final UUID leaderId, + @RpcTimeout final Time timeout); /** * Fail the slot with the given allocation id and cause. * - * @param allocationId identifying the slot to fail - * @param leaderId identifying the job leader - * @param cause of the failing + * @param taskManagerId identifying the task manager + * @param allocationId identifying the slot to fail + * @param leaderId identifying the job leader + * @param cause of the failing */ - void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause); + void failSlot(final ResourceID taskManagerId, + final AllocationID allocationId, + final UUID leaderId, + final Exception cause); /** * Register the task manager at the job manager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 31228046fef3a..f1a50736cd320 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -69,7 +69,7 @@ * ResourceManager implementation. The resource manager is responsible for resource de-/allocation * and bookkeeping. * - * It offers the following methods as part of its rpc interface to interact with the him remotely: + * It offers the following methods as part of its rpc interface to interact with him remotely: *
    *
  • {@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager
  • *
  • {@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager
  • diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 5146e5b77feb0..679324baa9cd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -72,6 +72,8 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotActions; import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException; import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlot; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -660,47 +662,49 @@ private void offerSlotsToJobManager(final JobID jobId) { final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); - final Iterator reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); + final Iterator reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); final UUID leaderId = jobManagerConnection.getLeaderId(); - final Collection reservedSlots = new HashSet<>(2); + final Collection reservedSlots = new HashSet<>(2); while (reservedSlotsIterator.hasNext()) { - reservedSlots.add(reservedSlotsIterator.next()); + reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer()); } - Future> acceptedSlotsFuture = jobMasterGateway.offerSlots( + Future> acceptedSlotsFuture = jobMasterGateway.offerSlots( + getResourceID(), reservedSlots, leaderId, taskManagerConfiguration.getTimeout()); - acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction>() { + acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction>() { @Override - public void accept(Iterable acceptedSlots) { + public void accept(Iterable acceptedSlots) { // check if the response is still valid if (isJobManagerConnectionValid(jobId, leaderId)) { // mark accepted slots active - for (AllocationID acceptedSlot: acceptedSlots) { + for (SlotOffer acceptedSlot: acceptedSlots) { try { - if (!taskSlotTable.markSlotActive(acceptedSlot)) { + if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { // the slot is either free or releasing at the moment final String message = "Could not mark slot " + jobId + " active."; log.debug(message); - jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message)); + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), + leaderId, new Exception(message)); } // remove the assigned slots so that we can free the left overs reservedSlots.remove(acceptedSlot); } catch (SlotNotFoundException e) { log.debug("Could not mark slot {} active.", acceptedSlot, e); - jobMasterGateway.failSlot(acceptedSlot, leaderId, e); + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e); } } final Exception e = new Exception("The slot was rejected by the JobManager."); - for (AllocationID rejectedSlot: reservedSlots) { - freeSlot(rejectedSlot, e); + for (SlotOffer rejectedSlot: reservedSlots) { + freeSlot(rejectedSlot.getAllocationId(), e); } } else { // discard the response since there is a new leader for the job @@ -718,8 +722,8 @@ public Void apply(Throwable throwable) { offerSlotsToJobManager(jobId); } else { // We encountered an exception. Free the slots and return them to the RM. - for (AllocationID reservedSlot: reservedSlots) { - freeSlot(reservedSlot, throwable); + for (SlotOffer reservedSlot: reservedSlots) { + freeSlot(reservedSlot.getAllocationId(), throwable); } } @@ -870,7 +874,7 @@ public Void apply(Throwable value) { private void unregisterTaskAndNotifyFinalState( final UUID jobMasterLeaderId, - final JobMasterGateway jobMasterGateway, + final JobMasterGateway jobMasterGateway, final ExecutionAttemptID executionAttemptID) { Task task = taskSlotTable.removeTask(executionAttemptID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java new file mode 100644 index 0000000000000..f8d7e6ca2eea5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Describe the slot offering to job manager provided by task manager. + */ +public class SlotOffer implements Serializable { + + private static final long serialVersionUID = -7067814231108250971L; + + /** Allocation id of this slot, this would be the only identifier for this slot offer */ + private AllocationID allocationId; + + /** Index of the offered slot */ + private final int slotIndex; + + /** The resource profile of the offered slot */ + private final ResourceProfile resourceProfile; + + public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) { + Preconditions.checkArgument(0 <= index, "The index must be greater than 0."); + this.allocationId = Preconditions.checkNotNull(allocationID); + this.slotIndex = index; + this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public int getSlotIndex() { + return slotIndex; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotOffer slotOffer = (SlotOffer) o; + return allocationId.equals(slotOffer.allocationId); + } + + @Override + public int hashCode() { + return allocationId.hashCode(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java index 0942772494d59..e12c15b5c411c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java @@ -286,4 +286,17 @@ public boolean markReleasing() { state = TaskSlotState.RELEASING; return true; } + + /** + * Generate the slot offer from this TaskSlot. + * + * @return The sot offer which this task slot can provide + */ + public SlotOffer generateSlotOffer() { + Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state, + "The task slot is not in state active or allocated."); + Preconditions.checkState(allocationId != null, "The task slot are not allocated"); + + return new SlotOffer(allocationId, index, resourceProfile); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 88123b4cc5724..88b83a0c707f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,7 +70,7 @@ public class TaskSlotTable implements TimeoutListener { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - + /** Whether the table has been started */ private boolean started; @@ -250,7 +250,7 @@ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { */ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); - + if (LOG.isDebugEnabled()) { LOG.debug("Free slot {}.", allocationId, cause); } else { @@ -370,13 +370,13 @@ public boolean hasAllocatedSlots(JobID jobId) { } /** - * Return an iterator of allocated slots (their allocation ids) for the given job id. + * Return an iterator of allocated slots for the given job id. * * @param jobId for which to return the allocated slots - * @return Iterator of allocation ids of allocated slots. + * @return Iterator of allocated slots. */ - public Iterator getAllocatedSlots(JobID jobId) { - return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED); + public Iterator getAllocatedSlots(JobID jobId) { + return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 55cc1421fbbcd..4d73a4b8efcb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -406,6 +407,7 @@ public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingExce final AllocationID allocationId = new AllocationID(); final SlotID slotId = new SlotID(resourceId, 0); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); try { TaskExecutor taskManager = new TaskExecutor( @@ -440,7 +442,11 @@ public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingExce jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); // the job leader should get the allocation id offered - verify(jobMasterGateway).offerSlots((Iterable)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class)); + verify(jobMasterGateway).offerSlots( + any(ResourceID.class), + (Iterable)Matchers.argThat(contains(slotOffer)), + eq(jobManagerLeaderId), + any(Time.class)); } finally { // check if a concurrent error occurred testingFatalErrorHandler.rethrowException(); @@ -496,6 +502,9 @@ public void testSlotAcceptance() throws Exception { final AllocationID allocationId1 = new AllocationID(); final AllocationID allocationId2 = new AllocationID(); + final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN); + final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN); + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( @@ -506,8 +515,9 @@ public void testSlotAcceptance() throws Exception { )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); - when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) - .thenReturn(FlinkCompletableFuture.completed((Iterable)Collections.singleton(allocationId1))); + when(jobMasterGateway.offerSlots( + any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) + .thenReturn(FlinkCompletableFuture.completed((Iterable)Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway);