Skip to content

Commit

Permalink
[hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent
Browse files Browse the repository at this point in the history
Previously in case of more data available, re-enquing a channel or an inputGate
was done in a separate critical section, resulting with more complicated concurrency
contract (critical section split into two). Side effect of this change
is that now recursive getNextBuffer/pollNextBufferOrEvent are happening also
under the lock, however they are non-blocking, so this shouldn't cause any issues.
  • Loading branch information
pnowojski committed May 10, 2019
1 parent cd13fdf commit 5b84dbf
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,21 +552,20 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

currentChannel = inputChannelsWithData.remove();
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());

result = currentChannel.getNextBuffer();

if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the currentChannel at the end to avoid starvation
inputChannelsWithData.add(currentChannel);
} else {
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
}

moreAvailable = !inputChannelsWithData.isEmpty();
}

result = currentChannel.getNextBuffer();
} while (!result.isPresent());

// this channel was now removed from the non-empty channels queue
// we re-add it in case it has more data, because in that case no "non-empty" notification
// will come for that channel
if (result.get().moreAvailable()) {
queueChannel(currentChannel);
moreAvailable = true;
}

final Buffer buffer = result.get().buffer();
numBytesIn.inc(buffer.getSizeUnsafe());
if (buffer.isBuffer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
InputGate inputGate = inputGateWithData.inputGate;
BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;

if (bufferOrEvent.moreAvailable()) {
// this buffer or event was now removed from the non-empty gates queue
// we re-add it in case it has more data, because in that case no "non-empty" notification
// will come for that gate
queueInputGate(inputGate);
}

if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) {
Expand All @@ -213,8 +206,6 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO

private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
while (true) {
InputGate inputGate;
boolean moreInputGatesAvailable;
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
if (blocking) {
Expand All @@ -223,15 +214,24 @@ private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) th
return Optional.empty();
}
}
inputGate = inputGatesWithData.remove();
enqueuedInputGatesWithData.remove(inputGate);
moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0;
}
final InputGate inputGate = inputGatesWithData.remove();

// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();

// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
if (bufferOrEvent.isPresent()) {
return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
// enqueue the inputGate at the end to avoid starvation
inputGatesWithData.add(inputGate);
} else {
enqueuedInputGatesWithData.remove(inputGate);
}

if (bufferOrEvent.isPresent()) {
return Optional.of(new InputGateWithData(
inputGate,
bufferOrEvent.get(),
!enqueuedInputGatesWithData.isEmpty()));
}
}
}
}
Expand Down

0 comments on commit 5b84dbf

Please sign in to comment.