Skip to content

Commit

Permalink
[FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
Browse files Browse the repository at this point in the history
This closes apache#13366
  • Loading branch information
becketqin authored and StephanEwen committed Sep 15, 2020
1 parent 7375589 commit a5b0d32
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private final FutureNotifier futureNotifier;

/** A queue to buffer the elements fetched by the fetcher thread. */
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;

/** The state of the splits. */
private final Map<String, SplitContext<T, SplitStateT>> splitStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,33 @@
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;

/**
* The default fetch task that fetches the records into the element queue.
*/
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<E, SplitT> splitReader;
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final Consumer<Collection<String>> splitFinishedCallback;
private final Thread runningThread;
private final int fetcherIndex;
private volatile RecordsWithSplitIds<E> lastRecords;
private volatile boolean wakeup;

FetchTask(
SplitReader<E, SplitT> splitReader,
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Consumer<Collection<String>> splitFinishedCallback,
Thread runningThread) {
SplitReader<E, SplitT> splitReader,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Consumer<Collection<String>> splitFinishedCallback,
int fetcherIndex) {
this.splitReader = splitReader;
this.elementsQueue = elementsQueue;
this.splitFinishedCallback = splitFinishedCallback;
this.lastRecords = null;
this.runningThread = runningThread;
this.fetcherIndex = fetcherIndex;
this.wakeup = false;
}

Expand All @@ -61,18 +61,18 @@ public boolean run() throws InterruptedException, IOException {
if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
elementsQueue.put(lastRecords);
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
lastRecords = null;
if (elementsQueue.put(fetcherIndex, lastRecords)) {
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
lastRecords = null;
}
}
} finally {
// clean up the potential wakeup effect. It is possible that the fetcher is waken up
// after the clean up. In that case, either the wakeup flag will be set or the
// running thread will be interrupted. The next invocation of run() will see that and
// just skip.
if (isWakenUp()) {
Thread.interrupted();
wakeup = false;
}
}
Expand All @@ -93,12 +93,12 @@ public void wakeUp() {
splitReader.wakeUp();
} else {
// The task might be blocking on enqueuing the records, just interrupt.
runningThread.interrupt();
elementsQueue.wakeUpPuttingThread(fetcherIndex);
}
}

private boolean isWakenUp() {
return wakeup || runningThread.isInterrupted();
return wakeup;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,7 +34,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final Map<String, SplitT> assignedSplits;
/** The current split assignments for this fetcher. */
private final Queue<SplitsChange<SplitT>> splitChanges;
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
private FetchTask<E, SplitT> fetchTask;
private volatile Thread runningThread;
private volatile SplitFetcherTask runningTask = null;
private volatile boolean isIdle;

SplitFetcher(
int id,
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Runnable shutdownHook) {
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Runnable shutdownHook) {

this.id = id;
this.taskQueue = new LinkedBlockingDeque<>();
Expand All @@ -83,14 +82,13 @@ public void run() {
LOG.info("Starting split fetcher {}", id);
try {
// Remove the split from the assignments if it is already done.
runningThread = Thread.currentThread();
this.fetchTask = new FetchTask<>(
splitReader,
elementsQueue,
ids -> {
ids.forEach(this::removeAssignedSplit);
updateIsIdle();
}, runningThread);
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
updateIsIdle();
}, id);
while (!closed.get()) {
runOnce();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
private final AtomicReference<Throwable> uncaughtFetcherException;

/** The element queue that the split fetchers will put elements into. */
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;

/** A map keeping track of all the split fetchers. */
protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
Expand Down
Loading

0 comments on commit a5b0d32

Please sign in to comment.