Skip to content

Commit

Permalink
[FLINK-17640][network][tests] Fix the unstable unit tests in Recovere…
Browse files Browse the repository at this point in the history
…dInputChannelTest
  • Loading branch information
zhijiangW committed May 14, 2020
1 parent c781d17 commit 3771835
Showing 1 changed file with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -76,7 +74,12 @@ public void testConcurrentReadStateAndRelease() throws Exception {
testConcurrentReadStateAndRelease(isRemote);
}

@Ignore("https://issues.apache.org/jira/browse/FLINK-17640")
/**
* Tests that there are no potential deadlock and buffer leak issues while the following actions happen concurrently:
* 1. Task thread processes the recovered state buffer from RecoveredInputChannel.
* 2. Unspilling IO thread reads the recovered state and queues the buffer into RecoveredInputChannel.
* 3. Canceler thread closes the input gate and releases the RecoveredInputChannel.
*/
@Test
public void testConcurrentReadStateAndProcessAndRelease() throws Exception {
testConcurrentReadStateAndProcessAndRelease(isRemote);
Expand Down Expand Up @@ -149,8 +152,8 @@ private void testConcurrentReadStateAndProcess(boolean isRemote) throws Exceptio
inputGate.setInputChannels(inputChannel);
inputGate.setup();

final Callable<Void> processTask = processRecoveredBufferTask(inputChannel, totalStates, states);
final Callable<Void> readStateTask = readRecoveredStateTask(inputChannel, reader);
final Callable<Void> processTask = processRecoveredBufferTask(inputChannel, totalStates, states, false);
final Callable<Void> readStateTask = readRecoveredStateTask(inputChannel, reader, false);

submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask});
} catch (Throwable t) {
Expand Down Expand Up @@ -181,7 +184,7 @@ private void testConcurrentReadStateAndRelease(boolean isRemote) throws Exceptio

submitTasksAndWaitForResults(
executor,
new Callable[] {readRecoveredStateTask(inputChannel, reader), releaseChannelTask(inputChannel)});
new Callable[] {readRecoveredStateTask(inputChannel, reader, true), releaseChannelTask(inputChannel)});
} catch (Throwable t) {
thrown = t;
} finally {
Expand All @@ -207,8 +210,8 @@ private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throw
inputGate.setInputChannels(inputChannel);
inputGate.setup();

final Callable<Void> processTask = processRecoveredBufferTask(inputChannel, totalStates, states);
final Callable<Void> readStateTask = readRecoveredStateTask(inputChannel, reader);
final Callable<Void> processTask = processRecoveredBufferTask(inputChannel, totalStates, states, true);
final Callable<Void> readStateTask = readRecoveredStateTask(inputChannel, reader, true);
final Callable<Void> releaseTask = releaseChannelTask(inputChannel);

submitTasksAndWaitForResults(executor, new Callable[] {readStateTask, processTask, releaseTask});
Expand All @@ -220,24 +223,26 @@ private void testConcurrentReadStateAndProcessAndRelease(boolean isRemote) throw
}
}

private Callable<Void> readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader) {
private Callable<Void> readRecoveredStateTask(RecoveredInputChannel inputChannel, ChannelStateReader reader, boolean verifyRelease) {
return () -> {
try {
inputChannel.readRecoveredState(reader);
} catch (CancelTaskException ex) {
// within expectation
} catch (Throwable t) {
if (!(verifyRelease && inputChannel.isReleased())) {
throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
}
}

return null;
};
}

private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states) {
private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputChannel, int totalStates, int[] states, boolean verifyRelease) {
return () -> {
// process all the queued state buffers and verify the data
int numProcessedStates = 0;
while (numProcessedStates < totalStates) {
if (inputChannel.isReleased()) {
if (verifyRelease && inputChannel.isReleased()) {
break;
}
if (inputChannel.getNumberOfQueuedBuffers() == 0) {
Expand All @@ -252,9 +257,9 @@ private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel inputCha
buffer.recycleBuffer();
numProcessedStates++;
}
} catch (IllegalStateException e) {
if (!e.getMessage().contains("Queried for a buffer after channel has been closed")) {
throw e;
} catch (Throwable t) {
if (!(verifyRelease && inputChannel.isReleased())) {
throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
}
}
}
Expand Down

0 comments on commit 3771835

Please sign in to comment.