Skip to content

Commit

Permalink
[hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWi…
Browse files Browse the repository at this point in the history
…thData fields with single LinkedHashSet
  • Loading branch information
pnowojski committed May 10, 2019
1 parent 07ae430 commit a075132
Showing 1 changed file with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -70,13 +70,11 @@ public class UnionInputGate implements InputGate, InputGateListener {

private final Set<InputGate> inputGatesWithRemainingData;

/** Gates, which notified this input gate about available data. */
private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();

/**
* Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}.
* Gates, which notified this input gate about available data. We are using it as a FIFO
* queue of {@link InputGate}s to avoid starvation and provide some basic fairness.
*/
private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>();
private final LinkedHashSet<InputGate> inputGatesWithData = new LinkedHashSet<>();

/** The total number of input channels across all unioned input gates. */
private final int totalNumberOfInputChannels;
Expand Down Expand Up @@ -214,23 +212,24 @@ private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) th
return Optional.empty();
}
}
final InputGate inputGate = inputGatesWithData.remove();

Iterator<InputGate> inputGateIterator = inputGatesWithData.iterator();
final InputGate inputGate = inputGateIterator.next();
inputGateIterator.remove();

// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();

if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
// enqueue the inputGate at the end to avoid starvation
inputGatesWithData.add(inputGate);
} else {
enqueuedInputGatesWithData.remove(inputGate);
}

if (bufferOrEvent.isPresent()) {
return Optional.of(new InputGateWithData(
inputGate,
bufferOrEvent.get(),
!enqueuedInputGatesWithData.isEmpty()));
!inputGatesWithData.isEmpty()));
}
}
}
Expand Down Expand Up @@ -290,14 +289,13 @@ private void queueInputGate(InputGate inputGate) {
int availableInputGates;

synchronized (inputGatesWithData) {
if (enqueuedInputGatesWithData.contains(inputGate)) {
if (inputGatesWithData.contains(inputGate)) {
return;
}

availableInputGates = inputGatesWithData.size();

inputGatesWithData.add(inputGate);
enqueuedInputGatesWithData.add(inputGate);

if (availableInputGates == 0) {
inputGatesWithData.notifyAll();
Expand Down

0 comments on commit a075132

Please sign in to comment.