Skip to content

Commit

Permalink
[FLINK-19717][connectors/common] Fix spurious InputStatus.END_OF_INPU…
Browse files Browse the repository at this point in the history
…T from SourceReaderBase.pollNext caused by split reader exception (apache#13776)
  • Loading branch information
kezhuw committed Nov 9, 2020
1 parent 0b3f15e commit 2cce1ac
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ private InputStatus finishedOrAvailableLater() {
return InputStatus.NOTHING_AVAILABLE;
}
if (elementsQueue.isEmpty()) {
// We may reach here because of exceptional split fetcher, check it.
splitFetcherManager.checkErrors();
return InputStatus.END_OF_INPUT;
} else {
throw new IllegalStateException("Called 'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +32,7 @@
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* The internal fetcher runnable responsible for polling message from the external system.
Expand All @@ -47,6 +47,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final Map<String, SplitT> assignedSplits;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Consumer<Throwable> errorHandler;
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
Expand All @@ -62,13 +63,15 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook) {

this.id = id;
this.taskQueue = new LinkedBlockingDeque<>();
this.elementsQueue = elementsQueue;
this.assignedSplits = new HashMap<>();
this.splitReader = splitReader;
this.errorHandler = errorHandler;
this.shutdownHook = shutdownHook;
this.isIdle = true;
this.wakeUp = new AtomicBoolean(false);
Expand All @@ -91,14 +94,19 @@ public void run() {
while (!closed.get()) {
runOnce();
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
LOG.info("Split fetcher {} exited.", id);
shutdownHook.run();
try {
splitReader.close();
} catch (Exception e) {
ExceptionUtils.rethrow(e);
errorHandler.accept(e);
}
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.ThrowableCatchingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,7 +111,7 @@ public void accept(Throwable t) {
public abstract void addSplits(List<SplitT> splitsToAdd);

protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
executors.submit(new ThrowableCatchingRunnable(errorHandler, fetcher));
executors.submit(fetcher);
}

/**
Expand All @@ -133,6 +132,7 @@ protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
fetcherId,
elementsQueue,
splitReader,
errorHandler,
() -> fetchers.remove(fetcherId));
fetchers.put(fetcherId, splitFetcher);
return splitFetcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.core.io.InputStatus;

import org.junit.Rule;
import org.junit.Test;
Expand All @@ -45,6 +47,7 @@
import java.util.List;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -87,9 +90,11 @@ public void close() throws Exception {}
reader.addSplits(Collections.singletonList(getSplit(0,
NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED)));
reader.handleSourceEvents(new NoMoreSplitsEvent());
// This is not a real infinite loop, it is supposed to throw exception after two polls.
while (true) {
reader.pollNext(output);
InputStatus inputStatus = reader.pollNext(output);
assertNotEquals(InputStatus.END_OF_INPUT, inputStatus);
// Add a sleep to avoid tight loop.
Thread.sleep(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;

Expand Down Expand Up @@ -164,6 +165,7 @@ public void testWakeup() throws InterruptedException {
0,
elementQueue,
new MockSplitReader(2, true),
ExceptionUtils::rethrow,
() -> {});

// Prepare the splits.
Expand Down Expand Up @@ -252,7 +254,7 @@ private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
final SplitReader<E, TestingSourceSplit> reader,
final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
return new SplitFetcher<>(0, queue, reader, () -> {});
return new SplitFetcher<>(0, queue, reader, ExceptionUtils::rethrow, () -> {});
}

private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
Expand Down

0 comments on commit 2cce1ac

Please sign in to comment.