Skip to content

Commit

Permalink
[FLINK-18595][network] Fix the deadlock of concurrently recycling buf…
Browse files Browse the repository at this point in the history
…fer and releasing input channel

Assuming two remote channels with buffer managers as listeners in LocalBufferPool, the deadlock happens as follows:
1. While the Canceler thread calling ch1#releaseAllResources, it will occupy bm1's bufferQueue lock and try to call bm2#notifyBufferAvailable.
2. While the task thread recycling exclusive buffer for ch2, then it will occupy bm2's bufferQueue lock and try to call bm1#notifyBufferAvailable.
3. These two threads will both occupy the respective bm's bufferQueue lock and wait for other side's bufferQueue lock to cause deadlock.

Regarding the solution, we can check the released state outside of bufferQueue lock in BufferManager#notifyBufferAvailable to return immediately.
  • Loading branch information
zhijiangW committed Jul 29, 2020
1 parent 335c47e commit c137102
Showing 1 changed file with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ void releaseAllBuffers(ArrayDeque<Buffer> buffers) throws IOException {
@Override
public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.BUFFER_NOT_USED;

// Assuming two remote channels with respective buffer managers as listeners inside LocalBufferPool.
// While canceler thread calling ch1#releaseAllResources, it might trigger bm2#notifyBufferAvaialble.
// Concurrently if task thread is recycling exclusive buffer, it might trigger bm1#notifyBufferAvailable.
// Then these two threads will both occupy the respective bufferQueue lock and wait for other side's
// bufferQueue lock to cause deadlock. So we check the isReleased state out of synchronized to resolve it.
if (inputChannel.isReleased()) {
return notificationResult;
}

try {
synchronized (bufferQueue) {
checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
Expand Down

0 comments on commit c137102

Please sign in to comment.