Skip to content

Commit

Permalink
[hotfix][network] Refactor InputGates code
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed May 10, 2019
1 parent 4527d05 commit 261efa8
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An input gate consumes one or more partitions of a single produced intermediate result.
*
Expand Down Expand Up @@ -112,4 +114,19 @@ protected void resetIsAvailable() {
isAvailable = new CompletableFuture<>();
}
}

/**
* Simple pojo for INPUT, DATA and moreAvailable.
*/
protected static class InputWithData<INPUT, DATA> {
protected final INPUT input;
protected final DATA data;
protected final boolean moreAvailable;

InputWithData(INPUT input, DATA data, boolean moreAvailable) {
this.input = checkNotNull(input);
this.data = checkNotNull(data);
this.moreAvailable = moreAvailable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,21 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

requestPartitions();
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}

InputChannel currentChannel;
boolean moreAvailable;
Optional<BufferAndAvailability> result = Optional.empty();
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input));
}

do {
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
synchronized (inputChannelsWithData) {
while (inputChannelsWithData.size() == 0) {
if (isReleased) {
Expand All @@ -550,29 +559,38 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}
}

currentChannel = inputChannelsWithData.remove();
InputChannel inputChannel = inputChannelsWithData.remove();

result = currentChannel.getNextBuffer();
Optional<BufferAndAvailability> result = inputChannel.getNextBuffer();

if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the currentChannel at the end to avoid starvation
inputChannelsWithData.add(currentChannel);
// enqueue the inputChannel at the end to avoid starvation
inputChannelsWithData.add(inputChannel);
} else {
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
}

moreAvailable = !inputChannelsWithData.isEmpty();

if (!moreAvailable) {
if (inputChannelsWithData.isEmpty()) {
resetIsAvailable();
}

if (result.isPresent()) {
return Optional.of(new InputWithData<>(
inputChannel,
result.get(),
!inputChannelsWithData.isEmpty()));
}
}
} while (!result.isPresent());
}
}

final Buffer buffer = result.get().buffer();
private BufferOrEvent transformToBufferOrEvent(
Buffer buffer,
boolean moreAvailable,
InputChannel currentChannel) throws IOException, InterruptedException {
numBytesIn.inc(buffer.getSizeUnsafe());
if (buffer.isBuffer()) {
return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
Expand All @@ -591,11 +609,10 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

currentChannel.notifySubpartitionConsumed();

currentChannel.releaseAllResources();
}

return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,36 +182,22 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
// Make sure to request the partitions, if they have not been requested before.
requestPartitions();

Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking);
Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}

InputGateWithData inputGateWithData = next.get();
InputGate inputGate = inputGateWithData.inputGate;
BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
InputWithData<InputGate, BufferOrEvent> inputWithData = next.get();

if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) {

checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find input gate in set of remaining " +
"input gates.");
}
}

// Set the channel index to identify the input channel (across all unioned input gates)
final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);

bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable);

return Optional.of(bufferOrEvent);
handleEndOfPartitionEvent(inputWithData.data, inputWithData.input);
return Optional.of(adjustForUnionInputGate(
inputWithData.data,
inputWithData.input,
inputWithData.moreAvailable));
}

private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
private Optional<InputWithData<InputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
Expand Down Expand Up @@ -242,7 +228,7 @@ private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) th
}

if (bufferOrEvent.isPresent()) {
return Optional.of(new InputGateWithData(
return Optional.of(new InputWithData<>(
inputGate,
bufferOrEvent.get(),
!inputGatesWithData.isEmpty()));
Expand All @@ -251,15 +237,29 @@ private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) th
}
}

private static class InputGateWithData {
private final InputGate inputGate;
private final BufferOrEvent bufferOrEvent;
private final boolean moreInputGatesAvailable;
private BufferOrEvent adjustForUnionInputGate(
BufferOrEvent bufferOrEvent,
InputGate inputGate,
boolean moreInputGatesAvailable) {
// Set the channel index to identify the input channel (across all unioned input gates)
final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);

bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || moreInputGatesAvailable);

InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) {
this.inputGate = checkNotNull(inputGate);
this.bufferOrEvent = checkNotNull(bufferOrEvent);
this.moreInputGatesAvailable = moreInputGatesAvailable;
return bufferOrEvent;
}

private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) {

checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find input gate in set of remaining " +
"input gates.");
}
}
}

Expand Down

0 comments on commit 261efa8

Please sign in to comment.