Skip to content

Commit

Permalink
[FLINK-27479] add logic to manage the availability future with respec…
Browse files Browse the repository at this point in the history
…t to the underlying reader future

formatting
  • Loading branch information
mas-chen authored and tweise committed Oct 9, 2022
1 parent c0df986 commit 9dad202
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ public InputStatus pollNext(ReaderOutput output) throws Exception {
new SourceReaderFinishedEvent(currentSourceIndex));
if (!isFinalSource) {
// More splits may arrive for a subsequent reader.
// InputStatus.NOTHING_AVAILABLE suspends poll, requires completion of the
// availability future after receiving more splits to resume.
if (availabilityFuture.isDone()) {
// reset to avoid continued polling
availabilityFuture = new CompletableFuture();
}
// InputStatus.NOTHING_AVAILABLE suspends poll, complete the
// availability future in the switchover to the next reader
return InputStatus.NOTHING_AVAILABLE;
}
}
Expand Down Expand Up @@ -133,6 +129,10 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {

@Override
public CompletableFuture<Void> isAvailable() {
if (availabilityFuture.isDone()) {
availabilityFuture = currentReader.isAvailable();
}

return availabilityFuture;
}

Expand Down Expand Up @@ -185,10 +185,6 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
switchedSources.put(sse.sourceIndex(), sse.source());
setCurrentReader(sse.sourceIndex());
isFinalSource = sse.isFinalSource();
if (!availabilityFuture.isDone()) {
// continue polling
availabilityFuture.complete(null);
}
} else {
currentReader.handleSourceEvents(sourceEvent);
}
Expand Down Expand Up @@ -231,16 +227,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
currentReader
.isAvailable()
.whenComplete(
(result, ex) -> {
if (ex == null) {
availabilityFuture.complete(result);
} else {
availabilityFuture.completeExceptionally(ex);
}
});
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
Expand All @@ -35,6 +42,8 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -120,6 +129,120 @@ public SourceReader<Integer, MockSourceSplit> createReader(
reader.close();
}

@Test
public void testAvailabilityFutureSwitchover() throws Exception {
TestingReaderContext readerContext = new TestingReaderContext();
TestingReaderOutput<Integer> readerOutput = new TestingReaderOutput<>();
MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);

// 2 underlying readers to exercise switch
MutableFutureSourceReader mockSplitReader1 =
MutableFutureSourceReader.createReader(readerContext);
MutableFutureSourceReader mockSplitReader2 =
MutableFutureSourceReader.createReader(readerContext);

HybridSourceReader<Integer> reader = new HybridSourceReader<>(readerContext);

assertThat(readerContext.getSentEvents()).isEmpty();
reader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
assertThat(currentReader(reader)).isNull();
assertThat(reader.pollNext(readerOutput)).isEqualTo(InputStatus.NOTHING_AVAILABLE);
CompletableFuture<Void> hybridSourceFutureBeforeFirstReader = reader.isAvailable();
assertThat(hybridSourceFutureBeforeFirstReader).isNotDone();

Source source1 =
new MockSource(null, 0) {
@Override
public SourceReader<Integer, MockSourceSplit> createReader(
SourceReaderContext readerContext) {
return mockSplitReader1;
}
};
reader.handleSourceEvents(new SwitchSourceEvent(0, source1, false));
assertThat(hybridSourceFutureBeforeFirstReader)
.isDone()
.as("the previous underlying future should be completed after switch event");

MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1);
mockSplit.addRecord(0);

SwitchedSources switchedSources = new SwitchedSources();
switchedSources.put(0, source);
HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources);
reader.addSplits(Collections.singletonList(hybridSplit));

// drain splits
CompletableFuture<Void> futureBeforeDraining = reader.isAvailable();
mockSplitReader1.completeFuture();
assertThat(futureBeforeDraining)
.isDone()
.as("underlying future is complete and hybrid source should poll");

InputStatus status = reader.pollNext(readerOutput);
while (readerOutput.getEmittedRecords().isEmpty() || status == InputStatus.MORE_AVAILABLE) {
status = reader.pollNext(readerOutput);
Thread.sleep(10);
}
// mock reader no more records
mockSplitReader1.resetFuture();

CompletableFuture<Void> futureAfterDraining = reader.isAvailable();
assertThat(futureBeforeDraining)
.isNotEqualTo(futureAfterDraining)
.as("Future should have been refreshed since the previous future is complete");
assertThat(futureAfterDraining).isNotDone().as("Future should not be complete");

assertThat(readerOutput.getEmittedRecords()).contains(0);
reader.pollNext(readerOutput);
assertThat(reader.pollNext(readerOutput))
.as("before notifyNoMoreSplits")
.isEqualTo(InputStatus.NOTHING_AVAILABLE);

reader.notifyNoMoreSplits();
reader.pollNext(readerOutput);
assertAndClearSourceReaderFinishedEvent(readerContext, 0);

assertThat(futureAfterDraining)
.isNotDone()
.as("still no more records and runtime should not poll");

assertThat(currentReader(reader))
.as("reader before switch source event")
.isEqualTo(mockSplitReader1);

Source source2 =
new MockSource(null, 0) {
@Override
public SourceReader<Integer, MockSourceSplit> createReader(
SourceReaderContext readerContext) {
return mockSplitReader2;
}
};

reader.handleSourceEvents(new SwitchSourceEvent(1, source2, true));
assertThat(futureAfterDraining)
.isDone()
.as("switching should signal completion to poll the new reader");
CompletableFuture<Void> futureReader2 = reader.isAvailable();

// futures must be different
assertThat(futureBeforeDraining).isNotSameAs(futureReader2);
assertThat(currentReader(reader))
.as("reader after switch source event")
.isEqualTo(mockSplitReader2);

reader.notifyNoMoreSplits();
assertThat(reader.pollNext(readerOutput))
.as("reader 1 after notifyNoMoreSplits")
.isEqualTo(InputStatus.END_OF_INPUT);
assertThat(futureReader2)
.isSameAs(reader.isAvailable())
.as("future should not have been refreshed");

reader.close();
}

@Test
public void testReaderRecovery() throws Exception {
TestingReaderContext readerContext = new TestingReaderContext();
Expand Down Expand Up @@ -201,4 +324,47 @@ private static void assertAndClearSourceReaderFinishedEvent(
.isEqualTo(sourceIndex);
context.clearSentEvents();
}

private static class MutableFutureSourceReader extends MockSourceReader {

private CompletableFuture<Void> availabilityFuture = new CompletableFuture<>();

public MutableFutureSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
Configuration config,
SourceReaderContext context) {
super(elementsQueue, splitFetcherSupplier, config, context);
}

public static MutableFutureSourceReader createReader(SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();

Configuration config = new Configuration();
config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2);
config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
MockSplitReader.Builder builder =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
.setBlockingFetch(true);
return new MutableFutureSourceReader(
elementsQueue, builder::build, config, readerContext);
}

@Override
public CompletableFuture<Void> isAvailable() {
return availabilityFuture;
}

public void completeFuture() {
availabilityFuture.complete(null);
}

public void resetFuture() {
if (this.availabilityFuture.isDone()) {
availabilityFuture = new CompletableFuture<>();
}
}
}
}

0 comments on commit 9dad202

Please sign in to comment.