Skip to content

Commit

Permalink
[FLINK-16641][network] (Part#6) Enable to set network buffers per cha…
Browse files Browse the repository at this point in the history
…nnel to 0

This PR enables to set the number of network buffer per channel (taskmanager.network.memory.buffers-per-channel) to 0. Previously, the value can not be set to 0 because of dead lock, FLINK-16641 solves the problem and we can set it to 0 now.
  • Loading branch information
wsry authored and pnowojski committed Jul 12, 2021
1 parent 7d1bb5f commit 60d015c
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ public class NettyShuffleEnvironmentOptions {

/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input
* channel).
* channel). The minimum valid value that can be configured is 0. When 0 buffers-per-channel is
* configured, the exclusive network buffers used per downstream incoming channel will be 0, but
* for each upstream outgoing channel, max(1, configured value) will be used. In other words we
* ensure that, for performance reasons, there is at least one buffer per outgoing channel
* regardless of the configuration.
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel
* serialization.
Expand All @@ -164,9 +168,18 @@ public class NettyShuffleEnvironmentOptions {
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription(
"Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel)"
+ " in the credit-based flow control model. It should be configured at least 2 for good performance."
+ " 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.");
"Number of exclusive network buffers to use for each outgoing/incoming "
+ "channel (subpartition/input channel) in the credit-based flow"
+ " control model. It should be configured at least 2 for good "
+ "performance. 1 buffer is for receiving in-flight data in the"
+ " subpartition and 1 buffer is for parallel serialization. The"
+ " minimum valid value that can be configured is 0. When 0 "
+ "buffers-per-channel is configured, the exclusive network "
+ "buffers used per downstream incoming channel will be 0, but "
+ "for each upstream outgoing channel, max(1, configured value)"
+ " will be used. In other words we ensure that, for performance"
+ " reasons, there is at least one buffer per outgoing channel "
+ "regardless of the configuration.");

/**
* Number of extra network buffers to use for each outgoing/incoming gate (result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,18 @@ public void recycle(MemorySegment segment) {
public List<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest)
throws IOException {
checkArgument(
numberOfSegmentsToRequest > 0,
"Number of buffers to request must be larger than 0.");
numberOfSegmentsToRequest >= 0,
"Number of buffers to request must be non-negative.");

synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}

if (numberOfSegmentsToRequest == 0) {
return Collections.emptyList();
}

tryRedistributeBuffers(numberOfSegmentsToRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ResultPartitionFactory {

private final BoundedBlockingSubpartitionType blockingSubpartitionType;

private final int networkBuffersPerChannel;
private final int configuredNetworkBuffersPerChannel;

private final int floatingNetworkBuffersPerGate;

Expand All @@ -82,7 +82,7 @@ public ResultPartitionFactory(
BatchShuffleReadBufferPool batchShuffleReadBufferPool,
ExecutorService batchShuffleReadIOExecutor,
BoundedBlockingSubpartitionType blockingSubpartitionType,
int networkBuffersPerChannel,
int configuredNetworkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
int networkBufferSize,
boolean blockingShuffleCompressionEnabled,
Expand All @@ -94,7 +94,7 @@ public ResultPartitionFactory(

this.partitionManager = partitionManager;
this.channelManager = channelManager;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.configuredNetworkBuffersPerChannel = configuredNetworkBuffersPerChannel;
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.bufferPoolFactory = bufferPoolFactory;
this.batchShuffleReadBufferPool = batchShuffleReadBufferPool;
Expand Down Expand Up @@ -159,11 +159,11 @@ public ResultPartition create(
if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
subpartitions[i] =
new PipelinedApproximateSubpartition(
i, networkBuffersPerChannel, pipelinedPartition);
i, configuredNetworkBuffersPerChannel, pipelinedPartition);
} else {
subpartitions[i] =
new PipelinedSubpartition(
i, networkBuffersPerChannel, pipelinedPartition);
i, configuredNetworkBuffersPerChannel, pipelinedPartition);
}
}

Expand Down Expand Up @@ -269,7 +269,7 @@ SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
return () -> {
Pair<Integer, Integer> pair =
NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition(
networkBuffersPerChannel,
configuredNetworkBuffersPerChannel,
floatingNetworkBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ private boolean shouldContinueRequest(BufferPool bufferPool) {

/** Requests exclusive buffers from the provider. */
void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
checkArgument(
!segments.isEmpty(),
"The number of exclusive buffers per channel should be larger than 0.");
checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative.");
if (numExclusiveBuffers == 0) {
return;
}

Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
synchronized (bufferQueue) {
// AvailableBufferQueue::addExclusiveBuffer may release the previously allocated
// floating buffer, which requires the caller to recycle these released floating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public RemoteInputChannel(
maxBackoff,
numBytesIn,
numBuffersIn);
checkArgument(networkBuffersPerChannel >= 0, "Must be non-negative.");

this.initialCredit = networkBuffersPerChannel;
this.connectionId = checkNotNull(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
Expand Down Expand Up @@ -87,7 +86,9 @@ public SingleInputGateFactory(
this.taskExecutorResourceId = taskExecutorResourceId;
this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff();
this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff();
this.networkBuffersPerChannel = networkConfig.networkBuffersPerChannel();
this.networkBuffersPerChannel =
NettyShuffleUtils.getNetworkBuffersPerInputChannel(
networkConfig.networkBuffersPerChannel());
this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate();
this.blockingShuffleCompressionEnabled =
networkConfig.isBlockingShuffleCompressionEnabled();
Expand All @@ -107,12 +108,7 @@ public SingleInputGate create(
@Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(
networkBufferPool,
networkBuffersPerChannel,
floatingNetworkBuffersPerGate,
igdd.getShuffleDescriptors().length,
igdd.getConsumedPartitionType());
createBufferPoolFactory(networkBufferPool, floatingNetworkBuffersPerGate);

BufferDecompressor bufferDecompressor = null;
if (igdd.getConsumedPartitionType().isBlocking() && blockingShuffleCompressionEnabled) {
Expand Down Expand Up @@ -235,11 +231,7 @@ protected InputChannel createKnownInputChannel(

@VisibleForTesting
static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
BufferPoolFactory bufferPoolFactory,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
int size,
ResultPartitionType type) {
BufferPoolFactory bufferPoolFactory, int floatingNetworkBuffersPerGate) {
Pair<Integer, Integer> pair =
NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
floatingNetworkBuffersPerGate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,30 @@
*/
public class NettyShuffleUtils {

/**
* Calculates and returns the number of required exclusive network buffers per input channel.
*/
public static int getNetworkBuffersPerInputChannel(
final int configuredNetworkBuffersPerChannel) {
return configuredNetworkBuffersPerChannel;
}

/**
* Calculates and returns the floating network buffer pool size used by the input gate. The
* left/right value of the returned pair represent the min/max buffers require by the pool.
*/
public static Pair<Integer, Integer> getMinMaxFloatingBuffersPerInputGate(
final int numFloatingBuffersPerGate) {
// We should guarantee at-least one floating buffer for local channel state recovery.
return Pair.of(1, numFloatingBuffersPerGate);
}

/**
* Calculates and returns local network buffer pool size used by the result partition. The
* left/right value of the returned pair represent the min/max buffers require by the pool.
*/
public static Pair<Integer, Integer> getMinMaxNetworkBuffersPerResultPartition(
final int numBuffersPerChannel,
final int configuredNetworkBuffersPerChannel,
final int numFloatingBuffersPerGate,
final int sortShuffleMinParallelism,
final int sortShuffleMinBuffers,
Expand All @@ -53,9 +69,15 @@ public static Pair<Integer, Integer> getMinMaxNetworkBuffersPerResultPartition(
: numSubpartitions + 1;
int max =
type.isBounded()
? numSubpartitions * numBuffersPerChannel + numFloatingBuffersPerGate
? numSubpartitions * configuredNetworkBuffersPerChannel
+ numFloatingBuffersPerGate
: Integer.MAX_VALUE;
return Pair.of(min, max);
// for each upstream hash-based blocking/pipelined subpartition, at least one buffer is
// needed even the configured network buffers per channel is 0 and this behavior is for
// performance. If it's not guaranteed that each subpartition can get at least one buffer,
// more partial buffers with little data will be outputted to network/disk and recycled to
// be used by other subpartitions which can not get a buffer for data caching.
return Pair.of(min, Math.max(min, max));
}

public static int computeNetworkBuffersForAnnouncing(
Expand All @@ -71,7 +93,7 @@ public static int computeNetworkBuffersForAnnouncing(
// Each input channel will retain N exclusive network buffers, N = numBuffersPerChannel.
// Each input gate is guaranteed to have a number of floating buffers.
int requirementForInputs =
numBuffersPerChannel * numTotalInputChannels
getNetworkBuffersPerInputChannel(numBuffersPerChannel) * numTotalInputChannels
+ getMinMaxFloatingBuffersPerInputGate(numFloatingBuffersPerGate).getRight()
* numTotalInputGates;

Expand All @@ -96,15 +118,15 @@ public static int computeNetworkBuffersForAnnouncing(

private static int getNumBuffersToAnnounceForResultPartition(
ResultPartitionType type,
int numBuffersPerChannel,
int configuredNetworkBuffersPerChannel,
int floatingBuffersPerGate,
int sortShuffleMinParallelism,
int sortShuffleMinBuffers,
int numSubpartitions) {

Pair<Integer, Integer> minAndMax =
getMinMaxNetworkBuffersPerResultPartition(
numBuffersPerChannel,
configuredNetworkBuffersPerChannel,
floatingBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ public void testRequestMemorySegmentsMoreThanTotalBuffers() {
@Test(expected = IllegalArgumentException.class)
public void testRequestMemorySegmentsWithInvalidArgument() throws IOException {
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
// the number of requested buffers should be larger than zero
globalPool.requestMemorySegments(0);
// the number of requested buffers should be non-negative
globalPool.requestMemorySegments(-1);
globalPool.destroy();
fail("Should throw an IllegalArgumentException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ public SingleInputGateBuilder setupBufferPoolFactory(NettyShuffleEnvironment env
NettyShuffleEnvironmentConfiguration config = environment.getConfiguration();
this.bufferPoolFactory =
SingleInputGateFactory.createBufferPoolFactory(
environment.getNetworkBufferPool(),
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
numberOfChannels,
partitionType);
environment.getNetworkBufferPool(), config.floatingNetworkBuffersPerGate());
this.segmentProvider = environment.getNetworkBufferPool();
return this;
}
Expand Down
Loading

0 comments on commit 60d015c

Please sign in to comment.