From 60d015cfc65d9f4b1a5765916ae14100d5dac70c Mon Sep 17 00:00:00 2001 From: "kevin.cyj" Date: Thu, 8 Jul 2021 20:23:10 +0800 Subject: [PATCH] [FLINK-16641][network] (Part#6) Enable to set network buffers per channel to 0 This PR enables to set the number of network buffer per channel (taskmanager.network.memory.buffers-per-channel) to 0. Previously, the value can not be set to 0 because of dead lock, FLINK-16641 solves the problem and we can set it to 0 now. --- .../all_taskmanager_network_section.html | 2 +- ...tty_shuffle_environment_configuration.html | 2 +- .../NettyShuffleEnvironmentOptions.java | 21 +++- .../io/network/buffer/NetworkBufferPool.java | 8 +- .../partition/ResultPartitionFactory.java | 12 +- .../partition/consumer/BufferManager.java | 9 +- .../consumer/RemoteInputChannel.java | 1 + .../consumer/SingleInputGateFactory.java | 18 +-- .../runtime/shuffle/NettyShuffleUtils.java | 34 +++++- .../network/buffer/NetworkBufferPoolTest.java | 4 +- .../consumer/SingleInputGateBuilder.java | 6 +- .../EventTimeWindowCheckpointingITCase.java | 23 +++- .../checkpointing/LocalRecoveryITCase.java | 2 + .../test/checkpointing/RescalingITCase.java | 22 +++- .../UnalignedCheckpointITCase.java | 19 +++- .../UnalignedCheckpointRescaleITCase.java | 104 +++++++++++------- .../UnalignedCheckpointTestBase.java | 9 +- 17 files changed, 197 insertions(+), 99 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 1eb3114ff8daa..f15895cac4501 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -30,7 +30,7 @@
taskmanager.network.memory.buffers-per-channel
2 Integer - Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. + Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.
taskmanager.network.memory.floating-buffers-per-gate
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index e651fcf823353..a304de937a08b 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -48,7 +48,7 @@
taskmanager.network.memory.buffers-per-channel
2 Integer - Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. + Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.
taskmanager.network.memory.floating-buffers-per-gate
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 31e1c4d71b1f9..e18f39c47d456 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -154,7 +154,11 @@ public class NettyShuffleEnvironmentOptions { /** * Number of network buffers to use for each outgoing/incoming channel (subpartition/input - * channel). + * channel). The minimum valid value that can be configured is 0. When 0 buffers-per-channel is + * configured, the exclusive network buffers used per downstream incoming channel will be 0, but + * for each upstream outgoing channel, max(1, configured value) will be used. In other words we + * ensure that, for performance reasons, there is at least one buffer per outgoing channel + * regardless of the configuration. * *

Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel * serialization. @@ -164,9 +168,18 @@ public class NettyShuffleEnvironmentOptions { key("taskmanager.network.memory.buffers-per-channel") .defaultValue(2) .withDescription( - "Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel)" - + " in the credit-based flow control model. It should be configured at least 2 for good performance." - + " 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization."); + "Number of exclusive network buffers to use for each outgoing/incoming " + + "channel (subpartition/input channel) in the credit-based flow" + + " control model. It should be configured at least 2 for good " + + "performance. 1 buffer is for receiving in-flight data in the" + + " subpartition and 1 buffer is for parallel serialization. The" + + " minimum valid value that can be configured is 0. When 0 " + + "buffers-per-channel is configured, the exclusive network " + + "buffers used per downstream incoming channel will be 0, but " + + "for each upstream outgoing channel, max(1, configured value)" + + " will be used. In other words we ensure that, for performance" + + " reasons, there is at least one buffer per outgoing channel " + + "regardless of the configuration."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 4f506349eb167..6112d7d023c7c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -167,14 +167,18 @@ public void recycle(MemorySegment segment) { public List requestMemorySegments(int numberOfSegmentsToRequest) throws IOException { checkArgument( - numberOfSegmentsToRequest > 0, - "Number of buffers to request must be larger than 0."); + numberOfSegmentsToRequest >= 0, + "Number of buffers to request must be non-negative."); synchronized (factoryLock) { if (isDestroyed) { throw new IllegalStateException("Network buffer pool has already been destroyed."); } + if (numberOfSegmentsToRequest == 0) { + return Collections.emptyList(); + } + tryRedistributeBuffers(numberOfSegmentsToRequest); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 4336c9dbd3048..aad174e22e4cb 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -57,7 +57,7 @@ public class ResultPartitionFactory { private final BoundedBlockingSubpartitionType blockingSubpartitionType; - private final int networkBuffersPerChannel; + private final int configuredNetworkBuffersPerChannel; private final int floatingNetworkBuffersPerGate; @@ -82,7 +82,7 @@ public ResultPartitionFactory( BatchShuffleReadBufferPool batchShuffleReadBufferPool, ExecutorService batchShuffleReadIOExecutor, BoundedBlockingSubpartitionType blockingSubpartitionType, - int networkBuffersPerChannel, + int configuredNetworkBuffersPerChannel, int floatingNetworkBuffersPerGate, int networkBufferSize, boolean blockingShuffleCompressionEnabled, @@ -94,7 +94,7 @@ public ResultPartitionFactory( this.partitionManager = partitionManager; this.channelManager = channelManager; - this.networkBuffersPerChannel = networkBuffersPerChannel; + this.configuredNetworkBuffersPerChannel = configuredNetworkBuffersPerChannel; this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; this.bufferPoolFactory = bufferPoolFactory; this.batchShuffleReadBufferPool = batchShuffleReadBufferPool; @@ -159,11 +159,11 @@ public ResultPartition create( if (type == ResultPartitionType.PIPELINED_APPROXIMATE) { subpartitions[i] = new PipelinedApproximateSubpartition( - i, networkBuffersPerChannel, pipelinedPartition); + i, configuredNetworkBuffersPerChannel, pipelinedPartition); } else { subpartitions[i] = new PipelinedSubpartition( - i, networkBuffersPerChannel, pipelinedPartition); + i, configuredNetworkBuffersPerChannel, pipelinedPartition); } } @@ -269,7 +269,7 @@ SupplierWithException createBufferPoolFactory( return () -> { Pair pair = NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition( - networkBuffersPerChannel, + configuredNetworkBuffersPerChannel, floatingNetworkBuffersPerGate, sortShuffleMinParallelism, sortShuffleMinBuffers, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java index 0867fbba82863..f417f2dee4b53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java @@ -134,11 +134,12 @@ private boolean shouldContinueRequest(BufferPool bufferPool) { /** Requests exclusive buffers from the provider. */ void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException { - Collection segments = globalPool.requestMemorySegments(numExclusiveBuffers); - checkArgument( - !segments.isEmpty(), - "The number of exclusive buffers per channel should be larger than 0."); + checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative."); + if (numExclusiveBuffers == 0) { + return; + } + Collection segments = globalPool.requestMemorySegments(numExclusiveBuffers); synchronized (bufferQueue) { // AvailableBufferQueue::addExclusiveBuffer may release the previously allocated // floating buffer, which requires the caller to recycle these released floating diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 4ff36b9a63262..f9cedf71105b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -132,6 +132,7 @@ public RemoteInputChannel( maxBackoff, numBytesIn, numBuffersIn); + checkArgument(networkBuffersPerChannel >= 0, "Must be non-negative."); this.initialCredit = networkBuffersPerChannel; this.connectionId = checkNotNull(connectionId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index 8b0032c15b0ef..0ecc524760a8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleUtils; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; @@ -87,7 +86,9 @@ public SingleInputGateFactory( this.taskExecutorResourceId = taskExecutorResourceId; this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff(); this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff(); - this.networkBuffersPerChannel = networkConfig.networkBuffersPerChannel(); + this.networkBuffersPerChannel = + NettyShuffleUtils.getNetworkBuffersPerInputChannel( + networkConfig.networkBuffersPerChannel()); this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate(); this.blockingShuffleCompressionEnabled = networkConfig.isBlockingShuffleCompressionEnabled(); @@ -107,12 +108,7 @@ public SingleInputGate create( @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { SupplierWithException bufferPoolFactory = - createBufferPoolFactory( - networkBufferPool, - networkBuffersPerChannel, - floatingNetworkBuffersPerGate, - igdd.getShuffleDescriptors().length, - igdd.getConsumedPartitionType()); + createBufferPoolFactory(networkBufferPool, floatingNetworkBuffersPerGate); BufferDecompressor bufferDecompressor = null; if (igdd.getConsumedPartitionType().isBlocking() && blockingShuffleCompressionEnabled) { @@ -235,11 +231,7 @@ protected InputChannel createKnownInputChannel( @VisibleForTesting static SupplierWithException createBufferPoolFactory( - BufferPoolFactory bufferPoolFactory, - int networkBuffersPerChannel, - int floatingNetworkBuffersPerGate, - int size, - ResultPartitionType type) { + BufferPoolFactory bufferPoolFactory, int floatingNetworkBuffersPerGate) { Pair pair = NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate( floatingNetworkBuffersPerGate); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java index 40e556d2d3150..936ffebd1a868 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java @@ -34,14 +34,30 @@ */ public class NettyShuffleUtils { + /** + * Calculates and returns the number of required exclusive network buffers per input channel. + */ + public static int getNetworkBuffersPerInputChannel( + final int configuredNetworkBuffersPerChannel) { + return configuredNetworkBuffersPerChannel; + } + + /** + * Calculates and returns the floating network buffer pool size used by the input gate. The + * left/right value of the returned pair represent the min/max buffers require by the pool. + */ public static Pair getMinMaxFloatingBuffersPerInputGate( final int numFloatingBuffersPerGate) { // We should guarantee at-least one floating buffer for local channel state recovery. return Pair.of(1, numFloatingBuffersPerGate); } + /** + * Calculates and returns local network buffer pool size used by the result partition. The + * left/right value of the returned pair represent the min/max buffers require by the pool. + */ public static Pair getMinMaxNetworkBuffersPerResultPartition( - final int numBuffersPerChannel, + final int configuredNetworkBuffersPerChannel, final int numFloatingBuffersPerGate, final int sortShuffleMinParallelism, final int sortShuffleMinBuffers, @@ -53,9 +69,15 @@ public static Pair getMinMaxNetworkBuffersPerResultPartition( : numSubpartitions + 1; int max = type.isBounded() - ? numSubpartitions * numBuffersPerChannel + numFloatingBuffersPerGate + ? numSubpartitions * configuredNetworkBuffersPerChannel + + numFloatingBuffersPerGate : Integer.MAX_VALUE; - return Pair.of(min, max); + // for each upstream hash-based blocking/pipelined subpartition, at least one buffer is + // needed even the configured network buffers per channel is 0 and this behavior is for + // performance. If it's not guaranteed that each subpartition can get at least one buffer, + // more partial buffers with little data will be outputted to network/disk and recycled to + // be used by other subpartitions which can not get a buffer for data caching. + return Pair.of(min, Math.max(min, max)); } public static int computeNetworkBuffersForAnnouncing( @@ -71,7 +93,7 @@ public static int computeNetworkBuffersForAnnouncing( // Each input channel will retain N exclusive network buffers, N = numBuffersPerChannel. // Each input gate is guaranteed to have a number of floating buffers. int requirementForInputs = - numBuffersPerChannel * numTotalInputChannels + getNetworkBuffersPerInputChannel(numBuffersPerChannel) * numTotalInputChannels + getMinMaxFloatingBuffersPerInputGate(numFloatingBuffersPerGate).getRight() * numTotalInputGates; @@ -96,7 +118,7 @@ public static int computeNetworkBuffersForAnnouncing( private static int getNumBuffersToAnnounceForResultPartition( ResultPartitionType type, - int numBuffersPerChannel, + int configuredNetworkBuffersPerChannel, int floatingBuffersPerGate, int sortShuffleMinParallelism, int sortShuffleMinBuffers, @@ -104,7 +126,7 @@ private static int getNumBuffersToAnnounceForResultPartition( Pair minAndMax = getMinMaxNetworkBuffersPerResultPartition( - numBuffersPerChannel, + configuredNetworkBuffersPerChannel, floatingBuffersPerGate, sortShuffleMinParallelism, sortShuffleMinBuffers, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index 235ca3c9c4e41..48c0b2eeeaec7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -294,8 +294,8 @@ public void testRequestMemorySegmentsMoreThanTotalBuffers() { @Test(expected = IllegalArgumentException.class) public void testRequestMemorySegmentsWithInvalidArgument() throws IOException { NetworkBufferPool globalPool = new NetworkBufferPool(10, 128); - // the number of requested buffers should be larger than zero - globalPool.requestMemorySegments(0); + // the number of requested buffers should be non-negative + globalPool.requestMemorySegments(-1); globalPool.destroy(); fail("Should throw an IllegalArgumentException"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 1ea50f20fc4b7..1bf5f27a870c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -102,11 +102,7 @@ public SingleInputGateBuilder setupBufferPoolFactory(NettyShuffleEnvironment env NettyShuffleEnvironmentConfiguration config = environment.getConfiguration(); this.bufferPoolFactory = SingleInputGateFactory.createBufferPoolFactory( - environment.getNetworkBufferPool(), - config.networkBuffersPerChannel(), - config.floatingNetworkBuffersPerGate(), - numberOfChannels, - partitionType); + environment.getNetworkBufferPool(), config.floatingNetworkBuffersPerGate()); this.segmentProvider = environment.getNetworkBufferPool(); return this; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index e00a3dffb4fc5..7280e8fa26cb1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBOptions; @@ -69,6 +70,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; import static org.junit.Assert.assertEquals; @@ -101,7 +103,9 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { private AbstractStateBackend stateBackend; - @Parameterized.Parameter public StateBackendEnum stateBackendEnum; + public StateBackendEnum stateBackendEnum; + + private final int buffersPerChannel; enum StateBackendEnum { MEM, @@ -113,9 +117,18 @@ enum StateBackendEnum { FILE_ASYNC } - @Parameterized.Parameters(name = "statebackend type ={0}") - public static Collection parameter() { - return Arrays.asList(StateBackendEnum.values()); + @Parameterized.Parameters(name = "statebackend type ={0}, buffersPerChannel = {1}") + public static Collection parameter() { + return Arrays.stream(StateBackendEnum.values()) + .map((type) -> new Object[][] {{type, 0}, {type, 2}}) + .flatMap(Arrays::stream) + .collect(Collectors.toList()); + } + + public EventTimeWindowCheckpointingITCase( + StateBackendEnum stateBackendEnum, int buffersPerChannel) { + this.stateBackendEnum = stateBackendEnum; + this.buffersPerChannel = buffersPerChannel; } protected StateBackendEnum getStateBackend() { @@ -154,6 +167,8 @@ private Configuration getConfiguration() throws Exception { } Configuration config = createClusterConfig(); + config.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel); switch (stateBackendEnum) { case MEM: diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java index 80a18be4d3144..8ef3f7efee0d9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java @@ -101,6 +101,8 @@ private static class EventTimeWindowCheckpointingITCaseInstance public EventTimeWindowCheckpointingITCaseInstance( StateBackendEnum backendEnum, boolean localRecoveryEnabled) { + super(backendEnum, 2); + this.backendEnum = backendEnum; this.localRecoveryEnabled = localRecoveryEnabled; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 5e444689b56ea..9854df21360b7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; @@ -68,6 +69,7 @@ import java.io.File; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -92,12 +94,22 @@ public class RescalingITCase extends TestLogger { private static final int slotsPerTaskManager = 2; private static final int numSlots = numTaskManagers * slotsPerTaskManager; - @Parameterized.Parameters(name = "backend = {0}") - public static Object[] data() { - return new Object[] {"filesystem", "rocksdb"}; + @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {"filesystem", 2}, {"rocksdb", 0}, {"filesystem", 0}, {"rocksdb", 2} + }); } - @Parameterized.Parameter public String backend; + public RescalingITCase(String backend, int buffersPerChannel) { + this.backend = backend; + this.buffersPerChannel = buffersPerChannel; + } + + private final String backend; + + private final int buffersPerChannel; private String currentBackend = null; @@ -130,6 +142,8 @@ public void setup() throws Exception { CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); config.setString( CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); + config.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel); cluster = new MiniClusterWithClientResource( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index defe9bfb814b2..f963d4d78f821 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -224,7 +224,8 @@ public String toString() { } } - @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout = {3}") + @Parameterized.Parameters( + name = "{0} with {2} channels, p = {1}, timeout = {3}, buffersPerChannel = {4}") public static Object[][] parameters() { Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0}; @@ -243,6 +244,13 @@ public static Object[][] parameters() { }; return Stream.of(runs) .map(params -> addDefaults(params, defaults)) + .map( + params -> + new Object[][] { + ArrayUtils.add(params, 0), + ArrayUtils.add(params, BUFFER_PER_CHANNEL) + }) + .flatMap(Arrays::stream) .toArray(Object[][]::new); } @@ -254,7 +262,11 @@ private static Object[] addDefaults(Object[] params, Object[] defaults) { private final UnalignedSettings settings; public UnalignedCheckpointITCase( - Topology topology, int parallelism, ChannelType channelType, int timeout) { + Topology topology, + int parallelism, + ChannelType channelType, + int timeout, + int buffersPerChannel) { settings = new UnalignedSettings(topology) .setParallelism(parallelism) @@ -266,7 +278,8 @@ public UnalignedCheckpointITCase( // after triggering) .setCheckpointTimeout(Duration.ofSeconds(30)) .setTolerableCheckpointFailures(3) - .setAlignmentTimeout(timeout); + .setAlignmentTimeout(timeout) + .setBuffersPerChannel(buffersPerChannel); } @Test diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java index fed104f71298a..fd89c38a65559 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java @@ -46,11 +46,13 @@ import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; +import org.apache.commons.lang3.ArrayUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; @@ -65,6 +67,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas private final Topology topology; private final int oldParallelism; private final int newParallelism; + private final int buffersPerChannel; enum Topology implements DagCreator { PIPELINE { @@ -456,53 +459,68 @@ public void processBroadcastElement(Long value, Context ctx, Collector out } } - @Parameterized.Parameters(name = "{0} {1} from {2} to {3}") + @Parameterized.Parameters(name = "{0} {1} from {2} to {3}, buffersPerChannel = {4}") public static Object[][] getScaleFactors() { - return new Object[][] { - new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7}, - new Object[] {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12}, - new Object[] {"downscale", Topology.KEYED_BROADCAST, 7, 2}, - new Object[] {"upscale", Topology.KEYED_BROADCAST, 2, 7}, - new Object[] {"downscale", Topology.BROADCAST, 5, 2}, - new Object[] {"upscale", Topology.BROADCAST, 2, 5}, - new Object[] {"upscale", Topology.PIPELINE, 1, 2}, - new Object[] {"upscale", Topology.PIPELINE, 2, 3}, - new Object[] {"upscale", Topology.PIPELINE, 3, 7}, - new Object[] {"upscale", Topology.PIPELINE, 4, 8}, - new Object[] {"upscale", Topology.PIPELINE, 20, 21}, - new Object[] {"downscale", Topology.PIPELINE, 2, 1}, - new Object[] {"downscale", Topology.PIPELINE, 3, 2}, - new Object[] {"downscale", Topology.PIPELINE, 7, 3}, - new Object[] {"downscale", Topology.PIPELINE, 8, 4}, - new Object[] {"downscale", Topology.PIPELINE, 21, 20}, - new Object[] {"no scale", Topology.PIPELINE, 1, 1}, - new Object[] {"no scale", Topology.PIPELINE, 3, 3}, - new Object[] {"no scale", Topology.PIPELINE, 7, 7}, - new Object[] {"no scale", Topology.PIPELINE, 20, 20}, - new Object[] {"upscale", Topology.UNION, 1, 2}, - new Object[] {"upscale", Topology.UNION, 2, 3}, - new Object[] {"upscale", Topology.UNION, 3, 7}, - new Object[] {"downscale", Topology.UNION, 2, 1}, - new Object[] {"downscale", Topology.UNION, 3, 2}, - new Object[] {"downscale", Topology.UNION, 7, 3}, - new Object[] {"no scale", Topology.UNION, 1, 1}, - new Object[] {"no scale", Topology.UNION, 7, 7}, - new Object[] {"upscale", Topology.MULTI_INPUT, 1, 2}, - new Object[] {"upscale", Topology.MULTI_INPUT, 2, 3}, - new Object[] {"upscale", Topology.MULTI_INPUT, 3, 7}, - new Object[] {"downscale", Topology.MULTI_INPUT, 2, 1}, - new Object[] {"downscale", Topology.MULTI_INPUT, 3, 2}, - new Object[] {"downscale", Topology.MULTI_INPUT, 7, 3}, - new Object[] {"no scale", Topology.MULTI_INPUT, 1, 1}, - new Object[] {"no scale", Topology.MULTI_INPUT, 7, 7}, - }; + Object[][] parameters = + new Object[][] { + new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7}, + new Object[] {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12}, + new Object[] {"downscale", Topology.KEYED_BROADCAST, 7, 2}, + new Object[] {"upscale", Topology.KEYED_BROADCAST, 2, 7}, + new Object[] {"downscale", Topology.BROADCAST, 5, 2}, + new Object[] {"upscale", Topology.BROADCAST, 2, 5}, + new Object[] {"upscale", Topology.PIPELINE, 1, 2}, + new Object[] {"upscale", Topology.PIPELINE, 2, 3}, + new Object[] {"upscale", Topology.PIPELINE, 3, 7}, + new Object[] {"upscale", Topology.PIPELINE, 4, 8}, + new Object[] {"upscale", Topology.PIPELINE, 20, 21}, + new Object[] {"downscale", Topology.PIPELINE, 2, 1}, + new Object[] {"downscale", Topology.PIPELINE, 3, 2}, + new Object[] {"downscale", Topology.PIPELINE, 7, 3}, + new Object[] {"downscale", Topology.PIPELINE, 8, 4}, + new Object[] {"downscale", Topology.PIPELINE, 21, 20}, + new Object[] {"no scale", Topology.PIPELINE, 1, 1}, + new Object[] {"no scale", Topology.PIPELINE, 3, 3}, + new Object[] {"no scale", Topology.PIPELINE, 7, 7}, + new Object[] {"no scale", Topology.PIPELINE, 20, 20}, + new Object[] {"upscale", Topology.UNION, 1, 2}, + new Object[] {"upscale", Topology.UNION, 2, 3}, + new Object[] {"upscale", Topology.UNION, 3, 7}, + new Object[] {"downscale", Topology.UNION, 2, 1}, + new Object[] {"downscale", Topology.UNION, 3, 2}, + new Object[] {"downscale", Topology.UNION, 7, 3}, + new Object[] {"no scale", Topology.UNION, 1, 1}, + new Object[] {"no scale", Topology.UNION, 7, 7}, + new Object[] {"upscale", Topology.MULTI_INPUT, 1, 2}, + new Object[] {"upscale", Topology.MULTI_INPUT, 2, 3}, + new Object[] {"upscale", Topology.MULTI_INPUT, 3, 7}, + new Object[] {"downscale", Topology.MULTI_INPUT, 2, 1}, + new Object[] {"downscale", Topology.MULTI_INPUT, 3, 2}, + new Object[] {"downscale", Topology.MULTI_INPUT, 7, 3}, + new Object[] {"no scale", Topology.MULTI_INPUT, 1, 1}, + new Object[] {"no scale", Topology.MULTI_INPUT, 7, 7}, + }; + return Arrays.stream(parameters) + .map( + params -> + new Object[][] { + ArrayUtils.add(params, 0), + ArrayUtils.add(params, BUFFER_PER_CHANNEL) + }) + .flatMap(Arrays::stream) + .toArray(Object[][]::new); } public UnalignedCheckpointRescaleITCase( - String desc, Topology topology, int oldParallelism, int newParallelism) { + String desc, + Topology topology, + int oldParallelism, + int newParallelism, + int buffersPerChannel) { this.topology = topology; this.oldParallelism = oldParallelism; this.newParallelism = newParallelism; + this.buffersPerChannel = buffersPerChannel; } @Test @@ -510,7 +528,8 @@ public void shouldRescaleUnalignedCheckpoint() throws Exception { final UnalignedSettings prescaleSettings = new UnalignedSettings(topology) .setParallelism(oldParallelism) - .setExpectedFailures(1); + .setExpectedFailures(1) + .setBuffersPerChannel(buffersPerChannel); prescaleSettings.setGenerateCheckpoint(true); final File checkpointDir = super.execute(prescaleSettings); @@ -518,7 +537,8 @@ public void shouldRescaleUnalignedCheckpoint() throws Exception { final UnalignedSettings postscaleSettings = new UnalignedSettings(topology) .setParallelism(newParallelism) - .setExpectedFailures(1); + .setExpectedFailures(1) + .setBuffersPerChannel(buffersPerChannel); postscaleSettings.setRestoreCheckpoint(checkpointDir); super.execute(postscaleSettings); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 18aab9146a886..b04a7edf63b65 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -684,6 +684,7 @@ protected static class UnalignedSettings { private Duration checkpointTimeout = CHECKPOINTING_TIMEOUT.defaultValue(); private int failuresAfterSourceFinishes = 0; private ChannelType channelType = ChannelType.MIXED; + private int buffersPerChannel = 1; public UnalignedSettings(DagCreator dagCreator) { this.dagCreator = dagCreator; @@ -734,6 +735,11 @@ public UnalignedSettings setTolerableCheckpointFailures(int tolerableCheckpointF return this; } + public UnalignedSettings setBuffersPerChannel(int buffersPerChannel) { + this.buffersPerChannel = buffersPerChannel; + return this; + } + public void configure(StreamExecutionEnvironment env) { env.enableCheckpointing(Math.max(100L, parallelism * 50L)); env.getCheckpointConfig().setAlignmentTimeout(Duration.ofMillis(alignmentTimeout)); @@ -770,8 +776,7 @@ public Configuration getConfiguration(File checkpointDir) { restoreCheckpoint.toURI().toString()); } - conf.set( - NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, BUFFER_PER_CHANNEL); + conf.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel); conf.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 60000); conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); return conf;