Skip to content

Commit

Permalink
[FLINK-17823][network] Resolve the race condition while releasing Rem…
Browse files Browse the repository at this point in the history
…oteInputChannel

RemoteInputChannel#releaseAllResources might be called by canceler thread. Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer.
There probably cause two potential problems:

1. Task thread might get null buffer after canceler thread already released all the buffers, then it might cause misleading NPE in getNextBuffer.
2. Task thread and canceler thread might pull the same buffer concurrently, which causes unexpected exception when the same buffer is recycled twice.

The solution is to properly synchronize the buffer queue in release method to avoid the same buffer pulled by both canceler thread and task thread.
And in getNextBuffer method, we add some explicit checks to avoid misleading NPE and hint some valid exceptions.
  • Loading branch information
zhijiangW committed Jun 2, 2020
1 parent 1c78ab3 commit 8c7c726
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
Expand Down Expand Up @@ -168,7 +169,6 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");

checkError();
Expand All @@ -181,6 +181,14 @@ Optional<BufferAndAvailability> getNextBuffer() throws IOException {
moreAvailable = !receivedBuffers.isEmpty();
}

if (next == null) {
if (isReleased.get()) {
throw new CancelTaskException("Queried for a buffer after channel has been released.");
} else {
throw new IllegalStateException("There should always have queued buffers for unreleased channel.");
}
}

numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, moreAvailable, 0));
Expand Down Expand Up @@ -242,9 +250,10 @@ public boolean isReleased() {
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {

ArrayDeque<Buffer> releasedBuffers;
final ArrayDeque<Buffer> releasedBuffers;
synchronized (receivedBuffers) {
releasedBuffers = receivedBuffers;
releasedBuffers = new ArrayDeque<>(receivedBuffers);
receivedBuffers.clear();
}
bufferManager.releaseAllBuffers(releasedBuffers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -1010,6 +1011,56 @@ public void testConcurrentRecycleAndRelease2() throws Exception {
}
}

@Test
public void testConcurrentGetNextBufferAndRelease() throws Exception {
final int numTotalBuffers = 1_000;
final int numExclusiveBuffers = 2;
final int numFloatingBuffers = 998;
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numTotalBuffers, 32, numExclusiveBuffers);
final SingleInputGate inputGate = createSingleInputGate(1, networkBufferPool);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannels(inputChannel);

final ExecutorService executor = Executors.newFixedThreadPool(2);
Throwable thrown = null;
try {
BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments();
inputChannel.requestSubpartition(0);

for (int i = 0; i < numTotalBuffers; i++) {
Buffer buffer = inputChannel.requestBuffer();
inputChannel.onBuffer(buffer, i, 0);
}

final Callable<Void> getNextBufferTask = () -> {
try {
for (int i = 0; i < numTotalBuffers; ++i) {
Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = inputChannel.getNextBuffer();
bufferAndAvailability.ifPresent(buffer -> buffer.buffer().recycleBuffer());
}
} catch (Throwable t) {
if (!inputChannel.isReleased()) {
throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
}
}
return null;
};

final Callable<Void> releaseTask = () -> {
inputChannel.releaseAllResources();
return null;
};

submitTasksAndWaitForResults(executor, new Callable[] {getNextBufferTask, releaseTask});
} catch (Throwable t) {
thrown = t;
} finally {
cleanup(networkBufferPool, executor, null, thrown, inputChannel);
}
}

/**
* Tests that {@link RemoteInputChannel#retriggerSubpartitionRequest(int)} would throw
* the {@link PartitionNotFoundException} if backoff is 0.
Expand Down

0 comments on commit 8c7c726

Please sign in to comment.