Skip to content

Commit

Permalink
[FLINk-17268][connector/common] Fix the bug of missing records during…
Browse files Browse the repository at this point in the history
… SplitFetcher wakeup. Added unit test for SplitFetcher.
  • Loading branch information
becketqin committed Apr 27, 2020
1 parent a9f8a0b commit 237a63f
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<E, SplitT> splitReader;
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final Consumer<Collection<String>> splitFinishedCallback;
private RecordsWithSplitIds<E> lastRecords;
private Thread runningThread;
private final Thread runningThread;
private volatile RecordsWithSplitIds<E> lastRecords;
private volatile boolean wakeup;

FetchTask(
Expand All @@ -52,33 +52,54 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {

@Override
public boolean run() throws InterruptedException {
if (lastRecords == null) {
lastRecords = splitReader.fetch();
}
if (!wakeup) {
elementsQueue.put(lastRecords);
splitFinishedCallback.accept(lastRecords.finishedSplits());
}
synchronized (this) {
wakeup = false;
lastRecords = null;
try {
if (!isWakenUp() && lastRecords == null) {
lastRecords = splitReader.fetch();
}

if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
elementsQueue.put(lastRecords);
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
lastRecords = null;
}
} finally {
// clean up the potential wakeup effect. It is possible that the fetcher is waken up
// after the clean up. In that case, either the wakeup flag will be set or the
// running thread will be interrupted. The next invocation of run() will see that and
// just skip.
if (isWakenUp()) {
Thread.interrupted();
wakeup = false;
}
}
// The return value of fetch task does not matter.
return true;
}

@Override
public void wakeUp() {
synchronized (this) {
wakeup = true;
if (lastRecords == null) {
splitReader.wakeUp();
} else {
runningThread.interrupt();
}
// Set the wakeup flag first.
wakeup = true;
if (lastRecords == null) {
// Two possible cases:
// 1. The splitReader is reading or is about to read the records.
// 2. The records has been enqueued and set to null.
// In case 1, we just wakeup the split reader. In case 2, the next run might be skipped.
// In any case, the records won't be enqueued in the ongoing run().
splitReader.wakeUp();
} else {
// The task might be blocking on enqueuing the records, just interrupt.
runningThread.interrupt();
}
}

private boolean isWakenUp() {
return wakeup || runningThread.isInterrupted();
}

@Override
public String toString() {
return "FetchTask";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ boolean shouldRunFetchTask() {
* synchronize to ensure the wake up logic do not touch a different invocation.
*/
void wakeUp(boolean taskOnly) {
// Synchronize to make sure the wake up only work for the current invocation of runOnce().
// Synchronize to make sure the wake up only works for the current invocation of runOnce().
synchronized (wakeUp) {
// Do not wake up repeatedly.
if (wakeUp.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,6 +73,9 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
/** An executor service with two threads. One for the fetcher and one for the future completing thread. */
private final ExecutorService executors;

/** Indicating the split fetcher manager has closed or not. */
private volatile boolean closed;

/**
* Create a split fetcher manager.
*
Expand Down Expand Up @@ -100,11 +103,12 @@ public void accept(Throwable t) {
this.splitReaderFactory = splitReaderFactory;
this.uncaughtFetcherException = new AtomicReference<>(null);
this.fetcherIdGenerator = new AtomicInteger(0);
this.fetchers = new HashMap<>();
this.fetchers = new ConcurrentHashMap<>();

// Create the executor with a thread factory that fails the source reader if one of
// the fetcher thread exits abnormally.
this.executors = Executors.newCachedThreadPool(r -> new Thread(r, "SourceFetcher"));
this.closed = false;
}

public abstract void addSplits(List<SplitT> splitsToAdd);
Expand All @@ -113,7 +117,16 @@ protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
executors.submit(new ThrowableCatchingRunnable(errorHandler, fetcher));
}

protected SplitFetcher<E, SplitT> createSplitFetcher() {
/**
* Synchronize method to ensure no fetcher is created after the split fetcher manager has closed.
*
* @return the created split fetcher.
* @throws IllegalStateException if the split fetcher manager has closed.
*/
protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
if (closed) {
throw new IllegalStateException("The split fetcher manager has closed.");
}
// Create SplitReader.
SplitReader<E, SplitT> splitReader = splitReaderFactory.get();

Expand All @@ -127,7 +140,14 @@ protected SplitFetcher<E, SplitT> createSplitFetcher() {
return splitFetcher;
}

public void close(long timeoutMs) throws Exception {
/**
* Close the split fetcher manager.
*
* @param timeoutMs the max time in milliseconds to wait.
* @throws Exception when failed to close the split fetcher manager.
*/
public synchronized void close(long timeoutMs) throws Exception {
closed = true;
fetchers.values().forEach(SplitFetcher::shutdown);
executors.shutdown();
if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T

protected static final int NUM_SPLITS = 10;
protected static final int NUM_RECORDS_PER_SPLIT = 10;
protected static final int TOTAL_NUM_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand All @@ -57,57 +58,61 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
*/
@Test
public void testRead() throws Exception {
SourceReader<Integer, SplitT> reader = createReader();
reader.addSplits(getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
ValidatingSourceOutput output = new ValidatingSourceOutput();
while (output.count < 100) {
reader.pollNext(output);
try (SourceReader<Integer, SplitT> reader = createReader()) {
reader.addSplits(getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
ValidatingSourceOutput output = new ValidatingSourceOutput();
while (output.count < TOTAL_NUM_RECORDS) {
reader.pollNext(output);
}
output.validate();
}
output.validate();
}

@Test
public void testAddSplitToExistingFetcher() throws Exception {
Thread.sleep(10);
ValidatingSourceOutput output = new ValidatingSourceOutput();
// Add a split to start the fetcher.
List<SplitT> splits = Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
// Poll 5 records and let it block on the element queue which only have capacity of 1;
SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, 5, Boundedness.BOUNDED);
List<SplitT> newSplits = new ArrayList<>();
for (int i = 1; i < NUM_SPLITS; i++) {
newSplits.add(getSplit(i, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
}
reader.addSplits(newSplits);

while (output.count() < 100) {
reader.pollNext(output);
try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, 5, Boundedness.BOUNDED)) {
List<SplitT> newSplits = new ArrayList<>();
for (int i = 1; i < NUM_SPLITS; i++) {
newSplits.add(getSplit(i, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
}
reader.addSplits(newSplits);

while (output.count() < NUM_RECORDS_PER_SPLIT * NUM_SPLITS) {
reader.pollNext(output);
}
output.validate();
}
output.validate();
}

@Test (timeout = 30000L)
public void testPollingFromEmptyQueue() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
List<SplitT> splits = Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
// Consumer all the records in the s;oit.
SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED);
// Now let the main thread poll again.
assertEquals("The status should be ", SourceReader.Status.AVAILABLE_LATER, reader.pollNext(output));
try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)) {
// Now let the main thread poll again.
assertEquals("The status should be ", SourceReader.Status.AVAILABLE_LATER, reader.pollNext(output));
}
}

@Test (timeout = 30000L)
public void testAvailableOnEmptyQueue() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
List<SplitT> splits = Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED));
// Consumer all the records in the split.
SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED);

CompletableFuture<?> future = reader.isAvailable();
assertFalse("There should be no records ready for poll.", future.isDone());
// Add a split to the reader so there are more records to be read.
reader.addSplits(Collections.singletonList(getSplit(1, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)));
// THe future should be completed fairly soon. Otherwise the test will hit timeout and fail.
future.get();
try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)) {
CompletableFuture<?> future = reader.isAvailable();
assertFalse("There should be no records ready for poll.", future.isDone());
// Add a split to the reader so there are more records to be read.
reader.addSplits(Collections.singletonList(getSplit(1, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)));
// THe future should be completed fairly soon. Otherwise the test will hit timeout and fail.
future.get();
}
}

@Test (timeout = 30000L)
Expand All @@ -116,13 +121,14 @@ public void testSnapshot() throws Exception {
// Add a split to start the fetcher.
List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED);
// Poll 5 records. That means split 0 and 1 will at index 2, split 1 will at index 1.
SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED);

List<SplitT> state = reader.snapshotState();
assertEquals("The snapshot should only have 10 splits. ", NUM_SPLITS, state.size());
for (int i = 0; i < NUM_SPLITS; i++) {
assertEquals("The first four splits should have been fully consumed.", NUM_RECORDS_PER_SPLIT, getIndex(state.get(i)));
try (SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED)) {
List<SplitT> state = reader.snapshotState();
assertEquals("The snapshot should only have 10 splits. ", NUM_SPLITS, state.size());
for (int i = 0; i < NUM_SPLITS; i++) {
assertEquals("The first four splits should have been fully consumed.",
NUM_RECORDS_PER_SPLIT, getIndex(state.get(i)));
}
}
}

Expand Down Expand Up @@ -177,10 +183,12 @@ public void collect(Integer element, long timestamp) {
}

public void validate() {
assertEquals("Should be 100 distinct elements in total", 100, consumedValues.size());
assertEquals("Should be 100 elements in total", 100, count);

assertEquals(String.format("Should be %d distinct elements in total", TOTAL_NUM_RECORDS),
TOTAL_NUM_RECORDS, consumedValues.size());
assertEquals(String.format("Should be %d elements in total", TOTAL_NUM_RECORDS), TOTAL_NUM_RECORDS, count);
assertEquals("The min value should be 0", 0, min);
assertEquals("The max value should be 99", 99, max);
assertEquals("The max value should be " + (TOTAL_NUM_RECORDS - 1), TOTAL_NUM_RECORDS - 1, max);
}

public int count() {
Expand Down
Loading

0 comments on commit 237a63f

Please sign in to comment.