Skip to content

Commit

Permalink
[FLINK-29298] LocalBufferPool request buffer from NetworkBufferPool h…
Browse files Browse the repository at this point in the history
…anging.

when the task thread polled out the last buffer in LocalBufferPool and triggered the onGlobalPoolAvailable callback itself, it will skip this notification  (as currently the LocalBufferPool is available), which will cause the BufferPool to eventually become unavailable and will never register a callback to the NetworkBufferPool.

This closes apache#20924
  • Loading branch information
reswqa authored and xintongsong committed Dec 13, 2022
1 parent 4df6a39 commit 875f27e
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ class LocalBufferPool implements BufferPool {
@GuardedBy("availableMemorySegments")
private final AvailabilityHelper availabilityHelper = new AvailabilityHelper();

/**
* Indicates whether this {@link LocalBufferPool} has requested to be notified on the next time
* that global pool becoming available, so it can then request buffer from the global pool.
*/
@GuardedBy("availableMemorySegments")
private boolean requestingWhenAvailable;
private boolean requestingNotificationOfGlobalPoolAvailable;

/**
* Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
Expand Down Expand Up @@ -232,14 +236,10 @@ class LocalBufferPool implements BufferPool {
this.maxBuffersPerChannel = maxBuffersPerChannel;
this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;

// Lock is only taken, because #checkAvailability asserts it. It's a small penalty for
// thread safety.
// Lock is only taken, because #checkAndUpdateAvailability asserts it. It's a small penalty
// for thread safety.
synchronized (this.availableMemorySegments) {
if (checkAvailability()) {
availabilityHelper.resetAvailable();
}

checkConsistentAvailability();
checkAndUpdateAvailability();
}
}

Expand Down Expand Up @@ -412,11 +412,7 @@ private MemorySegment requestMemorySegment(int targetChannel) {
}
}

if (!checkAvailability()) {
availabilityHelper.resetUnavailable();
}

checkConsistentAvailability();
checkAndUpdateAvailability();
}
return segment;
}
Expand Down Expand Up @@ -475,22 +471,22 @@ private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
* multiple {@link LocalBufferPool}s might wait on the future of the global pool, hence this
* method double-check if a new buffer is really needed at the time it becomes available.
*/
@GuardedBy("availableMemorySegments")
private void requestMemorySegmentFromGlobalWhenAvailable() {
assert Thread.holdsLock(availableMemorySegments);

if (requestingWhenAvailable) {
return;
}
requestingWhenAvailable = true;

checkState(
!requestingNotificationOfGlobalPoolAvailable,
"local buffer pool is already in the state of requesting memory segment from global when it is available.");
requestingNotificationOfGlobalPoolAvailable = true;
assertNoException(
networkBufferPool.getAvailableFuture().thenRun(this::onGlobalPoolAvailable));
}

private void onGlobalPoolAvailable() {
CompletableFuture<?> toNotify = null;
CompletableFuture<?> toNotify;
synchronized (availableMemorySegments) {
requestingWhenAvailable = false;
requestingNotificationOfGlobalPoolAvailable = false;
if (isDestroyed || availabilityHelper.isApproximatelyAvailable()) {
// there is currently no benefit to obtain buffer from global; give other pools
// precedent
Expand All @@ -502,9 +498,7 @@ private void onGlobalPoolAvailable() {
// #requestMemorySegmentFromGlobalWhenAvailable again if no segment could be fetched
// because of
// concurrent requests from different LocalBufferPools.
if (checkAvailability()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
toNotify = checkAndUpdateAvailability();
}
mayNotifyAvailable(toNotify);
}
Expand All @@ -517,21 +511,48 @@ private boolean shouldBeAvailable() {
&& numberOfRequestedOverdraftMemorySegments == 0;
}

private boolean checkAvailability() {
@GuardedBy("availableMemorySegments")
private CompletableFuture<?> checkAndUpdateAvailability() {
assert Thread.holdsLock(availableMemorySegments);

CompletableFuture<?> toNotify = null;

AvailabilityStatus availabilityStatus = checkAvailability();
if (availabilityStatus.isAvailable()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
} else {
availabilityHelper.resetUnavailable();
}
if (availabilityStatus.isNeedRequestingNotificationOfGlobalPoolAvailable()) {
requestMemorySegmentFromGlobalWhenAvailable();
}

checkConsistentAvailability();
return toNotify;
}

@GuardedBy("availableMemorySegments")
private AvailabilityStatus checkAvailability() {
assert Thread.holdsLock(availableMemorySegments);

if (!availableMemorySegments.isEmpty()) {
return shouldBeAvailable();
return AvailabilityStatus.from(shouldBeAvailable(), false);
}
if (isRequestedSizeReached()) {
return false;
return AvailabilityStatus.UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
}

// There aren't availableMemorySegments and we continue to request new memory segment.
boolean needRequestingNotificationOfGlobalPoolAvailable = false;
// There aren't availableMemorySegments and we continue to request new memory segment from
// global pool.
if (!requestMemorySegmentFromGlobal()) {
requestMemorySegmentFromGlobalWhenAvailable();
// If we can not get a buffer from global pool, we should request from it when it
// becomes available. It should be noted that if we are already in this status, do not
// need to repeat the request.
needRequestingNotificationOfGlobalPoolAvailable =
!requestingNotificationOfGlobalPoolAvailable;
}
return shouldBeAvailable();
return AvailabilityStatus.from(
shouldBeAvailable(), needRequestingNotificationOfGlobalPoolAvailable);
}

private void checkConsistentAvailability() {
Expand Down Expand Up @@ -633,7 +654,7 @@ public boolean addBufferListener(BufferListener listener) {

@Override
public void setNumBuffers(int numBuffers) {
CompletableFuture<?> toNotify = null;
CompletableFuture<?> toNotify;
synchronized (availableMemorySegments) {
checkArgument(
numBuffers >= numberOfRequiredMemorySegments,
Expand All @@ -648,18 +669,12 @@ public void setNumBuffers(int numBuffers) {
if (isDestroyed) {
// FLINK-19964: when two local buffer pools are released concurrently, one of them
// gets buffers assigned
// make sure that checkAvailability is not called as it would pro-actively acquire
// one buffer from NetworkBufferPool
// make sure that checkAndUpdateAvailability is not called as it would pro-actively
// acquire one buffer from NetworkBufferPool.
return;
}

if (checkAvailability()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
} else {
availabilityHelper.resetUnavailable();
}

checkConsistentAvailability();
toNotify = checkAndUpdateAvailability();
}

mayNotifyAvailable(toNotify);
Expand Down Expand Up @@ -757,4 +772,51 @@ public void recycle(MemorySegment memorySegment) {
bufferPool.recycle(memorySegment, channel);
}
}

/**
* This class represents the buffer pool's current ground-truth availability and whether to
* request buffer from global pool when it is available.
*/
private enum AvailabilityStatus {
AVAILABLE(true, false),
UNAVAILABLE_NEED_REQUESTING_NOTIFICATION(false, true),
UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION(false, false);

/** Indicates whether the {@link LocalBufferPool} is currently available. */
private final boolean available;

/**
* Indicates whether to requesting notification of global pool when it becomes available.
*/
private final boolean needRequestingNotificationOfGlobalPoolAvailable;

AvailabilityStatus(
boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
this.available = available;
this.needRequestingNotificationOfGlobalPoolAvailable =
needRequestingNotificationOfGlobalPoolAvailable;
}

public boolean isAvailable() {
return available;
}

public boolean isNeedRequestingNotificationOfGlobalPoolAvailable() {
return needRequestingNotificationOfGlobalPoolAvailable;
}

public static AvailabilityStatus from(
boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
if (available) {
checkState(
!needRequestingNotificationOfGlobalPoolAvailable,
"available local buffer pool should not request from global.");
return AVAILABLE;
} else if (needRequestingNotificationOfGlobalPoolAvailable) {
return UNAVAILABLE_NEED_REQUESTING_NOTIFICATION;
} else {
return UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
Expand All @@ -28,6 +29,7 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -248,6 +250,37 @@ public void testRecycleAfterDestroy() {
}
}

@Test
@Timeout(30)
public void testRequestBuffersOnRecycle() throws Exception {
BufferPool bufferPool1 = networkBufferPool.createBufferPool(512, 2048);
List<MemorySegment> segments = new ArrayList<>();
for (int i = 0; i < 1023; i++) {
segments.add(bufferPool1.requestMemorySegmentBlocking());
}
BufferPool bufferPool2 = networkBufferPool.createBufferPool(512, 512);
List<MemorySegment> segments2 = new ArrayList<>();
CheckedThread checkedThread =
new CheckedThread() {
@Override
public void go() throws Exception {
for (int i = 0; i < 512; i++) {
segments2.add(bufferPool2.requestMemorySegmentBlocking());
}
}
};
checkedThread.start();
for (MemorySegment segment : segments) {
bufferPool1.recycle(segment);
}
bufferPool1.lazyDestroy();
checkedThread.sync();
for (MemorySegment segment : segments2) {
bufferPool2.recycle(segment);
}
bufferPool2.lazyDestroy();
}

@Test
public void testRecycleExcessBuffersAfterRecycling() {
localBufferPool.setNumBuffers(numBuffers);
Expand Down

0 comments on commit 875f27e

Please sign in to comment.