Skip to content

Commit

Permalink
[FLINK-14214][runtime] Shortcut isAvailable().isDone() anti-starvatio…
Browse files Browse the repository at this point in the history
…n check in StreamTwoInputProcessor

This probably doesn't speed up the code, but it's how the isAvailable() was intended to be used.
  • Loading branch information
pnowojski committed Oct 2, 2019
1 parent b1ac77d commit bed28ab
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ private int selectNextReadingInputIndex() throws IOException {

// to avoid starvation, if the input selection is ALL and availableInputsMask is not ALL,
// always try to check and set the availability of another input
// TODO: because this can be a costly operation (checking volatile inside CompletableFuture`
// this might be optimized to only check once per processed NetworkBuffer
if (inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
checkAndSetAvailable(1 - readingInputIndex);
}
Expand Down Expand Up @@ -293,9 +291,15 @@ private void updateAvailability(InputStatus status, StreamTaskInput input) {
}

private void checkAndSetAvailable(int inputIndex) {
StreamTaskInput input = getInput(inputIndex);
InputStatus status = (inputIndex == 0 ? firstInputStatus : secondInputStatus);
if (status != InputStatus.END_OF_INPUT && input.isAvailable().isDone()) {
if (status == InputStatus.END_OF_INPUT) {
return;
}
CompletableFuture<?> inputAvailable = getInput(inputIndex).isAvailable();
// TODO: inputAvailable.isDone() can be a costly operation (checking volatile). If one of
// the input is constantly available and another is not, we will be checking this volatile
// once per every record. This might be optimized to only check once per processed NetworkBuffer
if (inputAvailable == AVAILABLE || inputAvailable.isDone()) {
inputSelectionHandler.setAvailableInput(inputIndex);
}
}
Expand Down

0 comments on commit bed28ab

Please sign in to comment.