Skip to content

Commit

Permalink
[FLINK-16641][network] (Part#2) Distinguish data buffer and event buf…
Browse files Browse the repository at this point in the history
…fer for BoundedBlockingSubpartitionDirectTransferReader

Currently, the BoundedBlockingSubpartitionDirectTransferReader does not distinguish data buffer and event buffer but it does not allocate floating credits for events, which means it relies on at least one exclusive credit to send the events. This patch changes the logic and distinguishes data buffer and event buffer for BoundedBlockingSubpartitionDirectTransferReader, after which the BoundedBlockingSubpartitionDirectTransferReader does not rely on the exclusive credits any more and we can set the exclusive credit to 0 after we finish FLINK-16641.
  • Loading branch information
wsry authored and pnowojski committed Jul 12, 2021
1 parent 3a46604 commit 31e1cd1
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class BoundedBlockingSubpartitionDirectTransferReader implements ResultSu
int numDataBuffers,
int numDataAndEventBuffers)
throws IOException {

checkArgument(numDataAndEventBuffers - numDataBuffers == 1, "Too many event buffers.");
this.parent = checkNotNull(parent);

checkNotNull(filePath);
Expand Down Expand Up @@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {

updateStatistics(current);

// We simply assume all the data are non-events for batch jobs to avoid pre-fetching the
// next header
Buffer.DataType nextDataType =
numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
// We simply assume all the data except for the last one (EndOfPartitionEvent)
// are non-events for batch jobs to avoid pre-fetching the next header
Buffer.DataType nextDataType = Buffer.DataType.NONE;
if (numDataBuffers > 0) {
nextDataType = Buffer.DataType.DATA_BUFFER;
} else if (numDataAndEventBuffers > 0) {
nextDataType = Buffer.DataType.EVENT_BUFFER;
}
return BufferAndBacklog.fromBufferAndLookahead(
current, nextDataType, numDataBuffers, sequenceNumber++);
}
Expand All @@ -110,7 +114,7 @@ private void updateStatistics(Buffer buffer) {
public boolean isAvailable(int numCreditsAvailable) {
// We simply assume there are no events except EndOfPartitionEvent for bath jobs,
// then it has no essential effect to ignore the judgement of next event buffer.
return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
return (numCreditsAvailable > 0 || numDataBuffers == 0) && numDataAndEventBuffers > 0;
}

@Override
Expand Down

0 comments on commit 31e1cd1

Please sign in to comment.