diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java index 81c420a93c8ab..bf7ea80771cce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java @@ -19,14 +19,12 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; -import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -76,7 +74,12 @@ public void testConcurrentReadStateAndRelease() throws Exception { testConcurrentReadStateAndRelease(isRemote); } - @Ignore("https://issues.apache.org/jira/browse/FLINK-17640") + /** + * Tests that there are no potential deadlock and buffer leak issues while the following actions happen concurrently: + * 1. Task thread processes the recovered state buffer from RecoveredInputChannel. + * 2. Unspilling IO thread reads the recovered state and queues the buffer into RecoveredInputChannel. + * 3. Canceler thread closes the input gate and releases the RecoveredInputChannel. + */ @Test public void testConcurrentReadStateAndProcessAndRelease() throws Exception { testConcurrentReadStateAndProcessAndRelease(isRemote); @@ -149,8 +152,8 @@ private void testConcurrentReadStateAndProcess(boolean isRemote) throws Exceptio inputGate.setInputChannels(inputChannel); inputGate.setup(); - final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states); - final Callable readStateTask = readRecoveredStateTask(inputChannel, reader); + final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states, false); + final Callable readStateTask = readRecoveredStateTask(inputChannel, reader, false); submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask}); } catch (Throwable t) { @@ -181,7 +184,7 @@ private void testConcurrentReadStateAndRelease(boolean isRemote) throws Exceptio submitTasksAndWaitForResults( executor, - new Callable[] {readRecoveredStateTask(inputChannel, reader), releaseChannelTask(inputChannel)}); + new Callable[] {readRecoveredStateTask(inputChannel, reader, true), releaseChannelTask(inputChannel)}); } catch (Throwable t) { thrown = t; } finally { @@ -207,8 +210,8 @@ private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throw inputGate.setInputChannels(inputChannel); inputGate.setup(); - final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states); - final Callable readStateTask = readRecoveredStateTask(inputChannel, reader); + final Callable processTask = processRecoveredBufferTask(inputChannel, totalStates, states, true); + final Callable readStateTask = readRecoveredStateTask(inputChannel, reader, true); final Callable releaseTask = releaseChannelTask(inputChannel); submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask, releaseTask}); @@ -220,24 +223,26 @@ private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throw } } - private Callable readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader) { + private Callable readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader, boolean verifyRelease) { return () -> { try { inputChannel.readRecoveredState(reader); - } catch (CancelTaskException ex) { - // within expectation + } catch (Throwable t) { + if (!(verifyRelease && inputChannel.isReleased())) { + throw new AssertionError("Exceptions are expected here only if the input channel was released", t); + } } return null; }; } - private Callable processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states) { + private Callable processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) { return () -> { // process all the queued state buffers and verify the data int numProcessedStates = 0; while (numProcessedStates < totalStates) { - if (inputChannel.isReleased()) { + if (verifyRelease && inputChannel.isReleased()) { break; } if (inputChannel.getNumberOfQueuedBuffers() == 0) { @@ -252,9 +257,9 @@ private Callable processRecoveredBufferTask(RecoveredInputChannel inputCha buffer.recycleBuffer(); numProcessedStates++; } - } catch (IllegalStateException e) { - if (!e.getMessage().contains("Queried for a buffer after channel has been closed")) { - throw e; + } catch (Throwable t) { + if (!(verifyRelease && inputChannel.isReleased())) { + throw new AssertionError("Exceptions are expected here only if the input channel was released", t); } } }