Skip to content

Commit

Permalink
[FLINK-17268][connector/common] Fix the shutdown sequence in SplitFet…
Browse files Browse the repository at this point in the history
…cher to avoid blocking on shutdown.

Fix flaky SourceReaderTestBase#testAvailableOnEmptyQueue().
  • Loading branch information
becketqin committed Apr 27, 2020
1 parent b7b2c43 commit 90a38e8
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");

private final int id;
private final BlockingDeque<SplitFetcherTask> taskQueue;
// track the assigned splits so we can suspend the reader when there is no splits assigned.
Expand Down Expand Up @@ -140,8 +142,12 @@ void runOnce() {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
// Clean the interrupt flag in case the running task was interrupted after it finishes
// running but before it was set to null.
Thread.interrupted();
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
}
}

Expand Down Expand Up @@ -236,31 +242,61 @@ void wakeUp(boolean taskOnly) {
// Synchronize to make sure the wake up only works for the current invocation of runOnce().
synchronized (wakeUp) {
// Do not wake up repeatedly.
if (wakeUp.compareAndSet(false, true)) {
// Now the wakeUp flag is set.
SplitFetcherTask currentTask = runningTask;
if (currentTask != null) {
// The running task may have missed our wakeUp flag and running, wake it up.
LOG.debug("Waking up running task {}", currentTask);
currentTask.wakeUp();
} else if (!taskOnly && runningThread != null) {
// The task has not started running yet, and it will not run for this
// runOnce() invocation due to the wakeUp flag. But we might have to
// interrupt the fetcher thread in case it is blocking on the task queue.
LOG.debug("Interrupting fetcher thread.");
// Only interrupt when the thread has started and there is no running task.
runningThread.interrupt();
}
wakeUp.set(true);
// Now the wakeUp flag is set.
SplitFetcherTask currentTask = runningTask;
if (isRunningTask(currentTask)) {
// The running task may have missed our wakeUp flag and running, wake it up.
LOG.debug("Waking up running task {}", currentTask);
currentTask.wakeUp();
} else if (!taskOnly) {
// The task has not started running yet, and it will not run for this
// runOnce() invocation due to the wakeUp flag. But we might have to
// wake up the fetcher thread in case it is blocking on the task queue.
// Only wake up when the thread has started and there is no running task.
LOG.debug("Waking up fetcher thread.");
taskQueue.add(WAKEUP_TASK);
}
}
}

private void maybeEnqueueTask(SplitFetcherTask task) {
// Only enqueue unfinished non-fetch task.
if (!closed.get() && task != null && task != fetchTask && !taskQueue.offerFirst(task)) {
if (!closed.get() && isRunningTask(task) && task != fetchTask && !taskQueue.offerFirst(task)) {
throw new RuntimeException(
"The task queue is full. This is only theoretically possible when really bad thing happens.");
}
LOG.debug("Enqueued task {}", task);
if (task != null) {
LOG.debug("Enqueued task {}", task);
}
}

private boolean isRunningTask(SplitFetcherTask task) {
return task != null && task != WAKEUP_TASK;
}

//--------------------- Helper class ------------------

private static class DummySplitFetcherTask implements SplitFetcherTask {
private final String name;

private DummySplitFetcherTask(String name) {
this.name = name;
}

@Override
public boolean run() throws InterruptedException {
return false;
}

@Override
public void wakeUp() {

}

@Override
public String toString() {
return name;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void testExceptionInSplitReader() throws Exception {
FutureNotifier futureNotifier = new FutureNotifier();
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>(futureNotifier);
MockSourceReader reader = new MockSourceReader(
// We have to handle split changes first, otherwise fetch will not be called.
try (MockSourceReader reader = new MockSourceReader(
futureNotifier,
elementsQueue,
() -> new SplitReader<int[], MockSourceSplit>() {
Expand All @@ -70,18 +71,21 @@ public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChang
}

@Override
public void wakeUp() {}
public void wakeUp() {
}
},
getConfig(),
null);

ValidatingSourceOutput output = new ValidatingSourceOutput();
reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)));
// This is not a real infinite loop, it is supposed to throw exception after two polls.
while (true) {
reader.pollNext(output);
// Add a sleep to avoid tight loop.
Thread.sleep(1);
null)) {
ValidatingSourceOutput output = new ValidatingSourceOutput();
reader.addSplits(Collections.singletonList(getSplit(0,
NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED)));
// This is not a real infinite loop, it is supposed to throw exception after two polls.
while (true) {
reader.pollNext(output);
// Add a sleep to avoid tight loop.
Thread.sleep(1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -53,6 +54,15 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
@Rule
public ExpectedException expectedException = ExpectedException.none();

@After
public void ensureNoDangling() {
for (Thread t : Thread.getAllStackTraces().keySet()) {
if (t.getName().equals("SourceFetcher")) {
System.out.println("Dangling thread.");
}
}
}

/**
* Simply test the reader reads all the splits fine.
*/
Expand Down Expand Up @@ -102,15 +112,13 @@ public void testPollingFromEmptyQueue() throws Exception {

@Test (timeout = 30000L)
public void testAvailableOnEmptyQueue() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
List<SplitT> splits = Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
// Consumer all the records in the split.
try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) {
try (SourceReader<Integer, SplitT> reader = createReader()) {
CompletableFuture<?> future = reader.isAvailable();
assertFalse("There should be no records ready for poll.", future.isDone());
// Add a split to the reader so there are more records to be read.
reader.addSplits(Collections.singletonList(getSplit(1, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)));
// THe future should be completed fairly soon. Otherwise the test will hit timeout and fail.
reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)));
// The future should be completed fairly soon. Otherwise the test will hit timeout and fail.
future.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,10 @@ public boolean equals(Object obj) {
Arrays.equals(records.toArray(new Integer[0]), that.records.toArray(new Integer[0])) &&
endIndex == that.endIndex;
}

@Override
public String toString() {
return String.format("MockSourceSplit(id=%d, num_records=%d, endIndex=%d, currentIndex=%d)",
id, records.size(), endIndex, index);
}
}

0 comments on commit 90a38e8

Please sign in to comment.