Skip to content

Commit

Permalink
[FLINK-5793] [runtime] fix running slot may not be add to AllocatedMa…
Browse files Browse the repository at this point in the history
…p in SlotPool bug

This closes apache#3306
  • Loading branch information
tiemsn authored and StephanEwen committed Feb 15, 2017
1 parent 7477c5b commit 33ea78e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ private void internalReturnAllocatedSlot(Slot slot) {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());

pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN));
SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
allocatedSlots.add(newSlot);
pendingRequest.future().complete(newSlot);
}
else {
LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
Expand Down Expand Up @@ -626,6 +628,15 @@ private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
return result;
}

// ------------------------------------------------------------------------
// Methods for tests
// ------------------------------------------------------------------------

@VisibleForTesting
AllocatedSlots getAllocatedSlots() {
return allocatedSlots;
}

// ------------------------------------------------------------------------
// Helper classes
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testAllocateSimpleSlot() throws Exception {
assertEquals(resourceID, slot.getTaskManagerID());
assertEquals(jobId, slot.getJobID());
assertEquals(slotPool.getSlotOwner(), slot.getOwner());
assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
}

@Test
Expand Down Expand Up @@ -153,6 +154,7 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception {
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
}

@Test
Expand Down

0 comments on commit 33ea78e

Please sign in to comment.