Skip to content

Commit

Permalink
Revert "[FLINK-10727][network] remove unnecessary synchronization in …
Browse files Browse the repository at this point in the history
…SingleInputGate#requestPartitions()"

This reverts commit 194603a because it caused the
creation of sub partitions to fail.
  • Loading branch information
tillrohrmann committed Nov 2, 2018
1 parent 341b62b commit 344df74
Showing 1 changed file with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,28 +477,26 @@ public boolean isFinished() {

@Override
public void requestPartitions() throws IOException, InterruptedException {
if (requestedPartitionsFlag) {
return;
}

synchronized (requestLock) {
if (isReleased) {
throw new IllegalStateException("Already released.");
}
if (!requestedPartitionsFlag) {
if (isReleased) {
throw new IllegalStateException("Already released.");
}

// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
"number of total input channels and the currently set number of input " +
"channels.");
}
// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
"number of total input channels and the currently set number of input " +
"channels.");
}

for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
}
}

requestedPartitionsFlag = true;
requestedPartitionsFlag = true;
}
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit 344df74

Please sign in to comment.