Skip to content

Commit

Permalink
[FLINK-24470][streaming] Added EMA calculation for buffer size inside…
Browse files Browse the repository at this point in the history
… BufferDebloater
  • Loading branch information
akalash authored and pnowojski committed Oct 28, 2021
1 parent e40b5c8 commit adb88cd
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class BufferDebloater {
private final long maxBufferSize;
private final long minBufferSize;
private final int bufferDebloatThresholdPercentages;
private final BufferSizeEMA bufferSizeEMA;

private int lastBufferSize;

private Duration lastEstimatedTimeToConsumeBuffers = Duration.ZERO;

public BufferDebloater(Configuration taskConfig, IndexedInputGate[] inputGates) {
Expand All @@ -54,6 +56,11 @@ public BufferDebloater(Configuration taskConfig, IndexedInputGate[] inputGates)
taskConfig.getInteger(BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES);

this.lastBufferSize = (int) maxBufferSize;
bufferSizeEMA =
new BufferSizeEMA(
(int) maxBufferSize,
(int) minBufferSize,
taskConfig.get(TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES));

// Right now the buffer size can not be grater than integer max value according to
// MemorySegment and buffer implementation.
Expand All @@ -72,13 +79,9 @@ public void recalculateBufferSize(long currentThroughput) {
for (IndexedInputGate inputGate : inputGates) {
totalNumber += Math.max(1, inputGate.getBuffersInUseCount());
}
int newSize =
(int)
Math.max(
minBufferSize,
Math.min(
desiredTotalBufferSizeInBytes / totalNumber,
maxBufferSize));

int newSize = bufferSizeEMA.calculateBufferSize(desiredTotalBufferSizeInBytes, totalNumber);

lastEstimatedTimeToConsumeBuffers =
Duration.ofMillis(
newSize * totalNumber * MILLIS_IN_SECOND / Math.max(1, currentThroughput));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.tasks.bufferdebloat;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Implementation of 'Exponential moving average' algorithm. */
public class BufferSizeEMA {
private final int maxBufferSize;
private final int minBufferSize;
/** EMA algorithm specific constant which responsible for speed of reaction. */
private final double alpha;

private int lastBufferSize;

public BufferSizeEMA(int maxBufferSize, int minBufferSize, long numberOfSamples) {
this.maxBufferSize = maxBufferSize;
this.minBufferSize = minBufferSize;
alpha = 2.0 / (numberOfSamples + 1);
this.lastBufferSize = maxBufferSize;
}

/**
* Calculating the buffer size over total possible buffers size and number of buffers in use.
*
* @param totalBufferSizeInBytes Total buffers size.
* @param totalBuffers Total number of buffers in use.
* @return Throughput calculated according to implemented algorithm.
*/
public int calculateBufferSize(long totalBufferSizeInBytes, int totalBuffers) {
checkArgument(totalBufferSizeInBytes >= 0, "Size of buffer should be non negative");
checkArgument(totalBuffers > 0, "Number of buffers should be positive");

// Since the result value is always limited by max buffer size while the instant value is
// potentially unlimited. It can lead to an instant change from min to max value in case
// when the instant value is significantly larger than the possible max value.
// The solution is to limit the instant buffer size by twice of current buffer size in order
// to have the same growth and shrink speeds. for example if the instant value is equal to 0
// and the current value is 16000 we can decrease it at maximum by 1600(suppose alfa=0.1) .
// The idea is to allow increase and decrease size by the same number. So if the instant
// value would be large(for example 100000) it will be possible to increase the current
// value by 1600(the same as decreasing) because the limit will be 2 * currentValue = 32000.
// Example of change speed:
// growing = 32768, 29647, 26823, 24268, 21956, 19864
// shrinking = 19864, 21755, 23826, 26095, 28580, 31301, 32768
long desirableBufferSize =
Math.min(totalBufferSizeInBytes / totalBuffers, 2L * lastBufferSize);

lastBufferSize += alpha * (desirableBufferSize - lastBufferSize);
return lastBufferSize = Math.max(minBufferSize, Math.min(lastBufferSize, maxBufferSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES;
import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
import static org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME;
Expand All @@ -190,9 +192,11 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1658,6 +1662,7 @@ public void testBufferSizeRecalculationStartSuccessfully() throws Exception {
new Configuration()
.set(BUFFER_DEBLOAT_PERIOD, Duration.ofHours(10))
.set(BUFFER_DEBLOAT_TARGET, Duration.ofSeconds(1))
.set(BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES, 1)
.set(BUFFER_DEBLOAT_ENABLED, true);

Map<String, Metric> metrics = new ConcurrentHashMap<>();
Expand All @@ -1682,21 +1687,32 @@ public long calculateThroughput() {
harness.processAll();
harness.streamTask.debloat();

int expectedBufferSize = expectedThroughput / inputChannels;
long lastBufferSize = -1;
for (InputGate inputGate : harness.streamTask.getEnvironment().getAllInputGates()) {
for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
long currentBufferSize =
((TestInputChannel) inputGate.getChannel(i)).getCurrentBufferSize();
assertThat(
((TestInputChannel) inputGate.getChannel(i)).getCurrentBufferSize(),
is(expectedBufferSize));
currentBufferSize,
lessThan(MEMORY_SEGMENT_SIZE.defaultValue().getBytes()));

assertThat(currentBufferSize, greaterThan(0L));

if (lastBufferSize > 0) {
assertThat(lastBufferSize, is(currentBufferSize));
}
lastBufferSize = currentBufferSize;
}
}
assertThat(
((Gauge<Integer>) metrics.get(MetricNames.DEBLOATED_BUFFER_SIZE)).getValue(),
is(expectedBufferSize));
is((int) lastBufferSize));
// The estimated time should be greater than the configured time because the buffer size
// is changed according to EMA, not instantly.
assertThat(
((Gauge<Long>) metrics.get(MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS))
.getValue(),
is(999L));
greaterThan(1000L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static java.util.Collections.singletonList;
import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE;
import static org.apache.flink.configuration.TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE;
Expand Down Expand Up @@ -279,6 +280,7 @@ public BufferDebloater expectBufferSize(int expectedBufferSize) {
new BufferDebloater(
new Configuration()
.set(BUFFER_DEBLOAT_ENABLED, true)
.set(BUFFER_DEBLOAT_SAMPLES, 1)
.set(BUFFER_DEBLOAT_TARGET, Duration.ofMillis(debloatTarget))
.set(
MEMORY_SEGMENT_SIZE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.tasks.bufferdebloat;

import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

/** Test for {@link BufferSizeEMA}. */
public class BufferSizeEMATest extends TestLogger {

@Test
public void testCalculationBufferSize() {
BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);

// The result value seeks to the bottom limit but it will take a while until it reaches it.
assertThat(calculator.calculateBufferSize(111, 13), is(104));
assertThat(calculator.calculateBufferSize(107, 7), is(59));
assertThat(calculator.calculateBufferSize(107, 7), is(37));
assertThat(calculator.calculateBufferSize(107, 7), is(26));
assertThat(calculator.calculateBufferSize(107, 7), is(20));
assertThat(calculator.calculateBufferSize(107, 7), is(17));
assertThat(calculator.calculateBufferSize(107, 13), is(12));
assertThat(calculator.calculateBufferSize(107, 13), is(10));

// Upgrade
assertThat(calculator.calculateBufferSize(333, 1), is(15));
assertThat(calculator.calculateBufferSize(333, 1), is(22));
assertThat(calculator.calculateBufferSize(333, 1), is(33));
assertThat(calculator.calculateBufferSize(333, 1), is(49));
assertThat(calculator.calculateBufferSize(333, 1), is(73));
assertThat(calculator.calculateBufferSize(333, 1), is(109));
}

@Test
public void testSizeGreaterThanMaxSize() {
BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);

// Decrease value to less than max.
assertThat(calculator.calculateBufferSize(0, 1), is(100));

// Impossible to exceed maximum.
assertThat(calculator.calculateBufferSize(1000, 1), is(150));
assertThat(calculator.calculateBufferSize(1000, 1), is(200));
assertThat(calculator.calculateBufferSize(1000, 1), is(200));
}

@Test
public void testSizeLessThanMinSize() {
BufferSizeEMA calculator = new BufferSizeEMA(200, 10, 3);

// Impossible to less than min.
assertThat(calculator.calculateBufferSize(0, 1), is(100));
assertThat(calculator.calculateBufferSize(0, 1), is(50));
assertThat(calculator.calculateBufferSize(0, 1), is(25));
assertThat(calculator.calculateBufferSize(0, 1), is(12));
assertThat(calculator.calculateBufferSize(0, 1), is(10));
assertThat(calculator.calculateBufferSize(0, 1), is(10));
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeTotalSize() {
BufferSizeEMA calculator = new BufferSizeEMA(100, 200, 2);
calculator.calculateBufferSize(-1, 1);
}

@Test(expected = IllegalArgumentException.class)
public void testZeroBuffers() {
BufferSizeEMA calculator = new BufferSizeEMA(100, 200, 2);
calculator.calculateBufferSize(1, 0);
}
}

0 comments on commit adb88cd

Please sign in to comment.