Skip to content

Commit

Permalink
[FLINK-20328][tests] Fixed buffer calculation in UnalignedCheckpointI…
Browse files Browse the repository at this point in the history
…TCase.

Ultimately, it was one buffer missing.
  • Loading branch information
Arvid Heise authored and AHeise committed Nov 25, 2020
1 parent c354f7b commit 9902c4f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public static Object[][] parameters() {
}

private static UnalignedSettings createPipelineSettings(int parallelism, int slotsPerTaskManager, boolean slotSharing) {
int numShuffles = 5;
int numShuffles = 4;
return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
.setParallelism(parallelism)
.setSlotSharing(slotSharing)
.setNumSlots(slotSharing ? parallelism : parallelism * numShuffles)
.setNumBuffers(3 * slotsPerTaskManager * parallelism * numShuffles)
.setNumBuffers(getNumBuffers(parallelism, numShuffles))
.setSlotsPerTaskManager(slotsPerTaskManager)
.setExpectedFailures(5);
}
Expand All @@ -126,7 +126,7 @@ private static UnalignedSettings createCogroupSettings(int parallelism) {
.setParallelism(parallelism)
.setSlotSharing(true)
.setNumSlots(parallelism * numShuffles)
.setNumBuffers(3 * parallelism * parallelism * numShuffles)
.setNumBuffers(getNumBuffers(parallelism, numShuffles))
.setSlotsPerTaskManager(parallelism)
.setExpectedFailures(5);
}
Expand All @@ -137,11 +137,17 @@ private static UnalignedSettings createUnionSettings(int parallelism) {
.setParallelism(parallelism)
.setSlotSharing(true)
.setNumSlots(parallelism * numShuffles)
.setNumBuffers(3 * parallelism * parallelism * numShuffles)
.setNumBuffers(getNumBuffers(parallelism, numShuffles))
.setSlotsPerTaskManager(parallelism)
.setExpectedFailures(5);
}

private static int getNumBuffers(int parallelism, int numShuffles) {
int buffersPerSubtask = parallelism + 1 + // output side
2 * BUFFER_PER_CHANNEL * parallelism; // input side including recovery (=local channels count fully)
return buffersPerSubtask * parallelism * numShuffles;
}

private final UnalignedSettings settings;

public UnalignedCheckpointITCase(String desc, UnalignedSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
protected static final String NUM_FAILURES = "failures";
protected static final String NUM_DUPLICATES = "duplicates";
protected static final String NUM_LOST = "lost";
public static final int BUFFER_PER_CHANNEL = 1;

@Rule
public final TemporaryFolder temp = new TemporaryFolder();
Expand Down Expand Up @@ -445,7 +446,7 @@ public StreamExecutionEnvironment createEnvironment(File checkpointDir) {
conf.set(SavepointConfigOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
}

conf.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 1);
conf.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, BUFFER_PER_CHANNEL);
conf.set(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, slotsPerTaskManager);

final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
Expand Down

0 comments on commit 9902c4f

Please sign in to comment.