Skip to content

Commit

Permalink
[FLINK-25960][network] Distribute the data read buffers more fairly a…
Browse files Browse the repository at this point in the history
…mong result partitions for sort-shuffle

Currently, the data read buffers for sort-shuffle are allocated in a random way and some result partitions may occupy too many buffers which leads to the starvation of other result partitions. This patch improves the scenario by not reading data for those result partitions which already occupy more than the average number of read buffers per result partition.

This closes apache#18631.
  • Loading branch information
wsry authored and gaoyunhaii committed Feb 9, 2022
1 parent c1f5c53 commit f847372
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class BatchShuffleReadBufferPool {
/** The number of buffers to be returned for a single request. */
private final int numBuffersPerRequest;

/** All requesters which need to request buffers from this pool currently. */
private final Set<Object> bufferRequesters = ConcurrentHashMap.newKeySet();

/** All available buffers in this buffer pool currently. */
@GuardedBy("buffers")
private final Queue<MemorySegment> buffers = new ArrayDeque<>();
Expand Down Expand Up @@ -181,6 +186,18 @@ public void initialize() {
bufferSize);
}

public void registerRequester(Object requester) {
bufferRequesters.add(requester);
}

public void unregisterRequester(Object requester) {
bufferRequesters.remove(requester);
}

public int getAverageBuffersPerRequester() {
return Math.max(1, numTotalBuffers / Math.max(1, bufferRequesters.size()));
}

/**
* Requests a collection of buffers (determined by {@link #numBuffersPerRequest}) from this
* buffer pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ private void removeFinishedAndFailedReaders(
failedReaders.clear();

if (allReaders.isEmpty()) {
bufferPool.unregisterRequester(this);
closeFileChannels();
}

Expand Down Expand Up @@ -303,6 +304,9 @@ SortMergeSubpartitionReader createSubpartitionReader(
PartitionedFileReader fileReader = createFileReader(resultFile, targetSubpartition);
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(availabilityListener, fileReader);
if (allReaders.isEmpty()) {
bufferPool.registerRequester(this);
}
allReaders.add(subpartitionReader);
subpartitionReader
.getReleaseFuture()
Expand Down Expand Up @@ -370,8 +374,8 @@ private void mayTriggerReading() {

if (!isRunning
&& !allReaders.isEmpty()
&& numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
<= maxRequestedBuffers) {
&& numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers
&& numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) {
isRunning = true;
ioExecutor.execute(this);
}
Expand Down

0 comments on commit f847372

Please sign in to comment.