Skip to content

Commit

Permalink
[FLINK-33077][runtime] Minimize the risk of hard back-pressure with b…
Browse files Browse the repository at this point in the history
…uffer debloating enabled

Problem:
Buffer debloating sets buffer size to 256 bytes because of back-pressure.
Such small buffers might not be enough to emit the processing results of a single record. The task thread would request new buffers, and often block.
That results in significant checkpoint delays (up to minutes instead of seconds).

Adding more overdraft buffers helps, but depends on the job DoP
Raising taskmanager.memory.min-segment-size from 256 helps, but depends on the multiplication factor of the operator.

Solution:
- Ignore Buffer Debloater hints and extend the buffer if possible - when this prevents emitting an output record fully AND this is the last available buffer.
- Prevent the subsequent flush of the buffer so that more output records can be emitted (flatMap-like and join operators)
  • Loading branch information
rkhachatryan committed Sep 14, 2023
1 parent 2ced46b commit c7f6470
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,34 @@ private BufferBuilder appendUnicastDataForNewRecord(
addToSubpartition(buffer, targetSubpartition, 0, record.remaining());
}

buffer.appendAndCommit(record);
append(record, buffer);

return buffer;
}

private int append(ByteBuffer record, BufferBuilder buffer) {
// Try to avoid hard back-pressure in the subsequent calls to request buffers
// by ignoring Buffer Debloater hints and extending the buffer if possible (trim).
// This decreases the probability of hard back-pressure in cases when
// the output size varies significantly and BD suggests too small values.
// The hint will be re-applied on the next iteration.
if (record.remaining() >= buffer.getWritableBytes()) {
// This 2nd check is expensive, so it shouldn't be re-ordered.
// However, it has the same cost as the subsequent call to request buffer, so it doesn't
// affect the performance much.
if (!bufferPool.isAvailable()) {
// add 1 byte to prevent immediately flushing the buffer and potentially fit the
// next record
int newSize =
buffer.getMaxCapacity()
+ (record.remaining() - buffer.getWritableBytes())
+ 1;
buffer.trim(Math.max(buffer.getMaxCapacity(), newSize));
}
}
return buffer.appendAndCommit(record);
}

private void addToSubpartition(
BufferBuilder buffer,
int targetSubpartition,
Expand Down Expand Up @@ -339,7 +362,7 @@ private BufferBuilder appendUnicastDataForRecordContinuation(
// starting
// with a complete record.
// !! The next two lines can not change order.
final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
final int partialRecordBytes = append(remainingRecordBytes, buffer);
addToSubpartition(buffer, targetSubpartition, partialRecordBytes, partialRecordBytes);

return buffer;
Expand All @@ -354,7 +377,7 @@ private BufferBuilder appendBroadcastDataForNewRecord(final ByteBuffer record)
createBroadcastBufferConsumers(buffer, 0, record.remaining());
}

buffer.appendAndCommit(record);
append(record, buffer);

return buffer;
}
Expand All @@ -368,7 +391,7 @@ private BufferBuilder appendBroadcastDataForRecordContinuation(
// starting
// with a complete record.
// !! The next two lines can not change order.
final int partialRecordBytes = buffer.appendAndCommit(remainingRecordBytes);
final int partialRecordBytes = append(remainingRecordBytes, buffer);
createBroadcastBufferConsumers(buffer, partialRecordBytes, partialRecordBytes);

return buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,27 @@ void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
}
}

@Test
void testEmitRecordExpandsLastBuffer() throws IOException {
int recordSize = 10;
int maxBufferSize = 2 * recordSize;
// create a pool with just 1 buffer - so that the test times out in case of back-pressure
NetworkBufferPool globalPool = new NetworkBufferPool(1, maxBufferSize);
BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
ResultPartition resultPartition =
new ResultPartitionBuilder().setBufferPoolFactory(() -> localPool).build();
resultPartition.setup();
// emulate BufferDebloater - and suggest small buffer size
resultPartition.createSubpartitionView(0, () -> {}).notifyNewBufferSize(1);
// need to insert two records: the 1st one expands the buffer regardless of back-pressure
resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
// insert the 2nd record:
// - the buffer should still be available for writing after the previous record
// - it should be resized again to fit the new record fully
// - so no new buffer is necessary and there is no back-pressure
resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
}

@Test
void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
BufferWritingResultPartition bufferWritingResultPartition =
Expand Down

0 comments on commit c7f6470

Please sign in to comment.