diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 855f85ddd3fe3..2f113078fe330 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -96,12 +96,8 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { new SourceReaderFinishedEvent(currentSourceIndex)); if (!isFinalSource) { // More splits may arrive for a subsequent reader. - // InputStatus.NOTHING_AVAILABLE suspends poll, requires completion of the - // availability future after receiving more splits to resume. - if (availabilityFuture.isDone()) { - // reset to avoid continued polling - availabilityFuture = new CompletableFuture(); - } + // InputStatus.NOTHING_AVAILABLE suspends poll, complete the + // availability future in the switchover to the next reader return InputStatus.NOTHING_AVAILABLE; } } @@ -133,6 +129,10 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { @Override public CompletableFuture isAvailable() { + if (availabilityFuture.isDone()) { + availabilityFuture = currentReader.isAvailable(); + } + return availabilityFuture; } @@ -185,10 +185,6 @@ public void handleSourceEvents(SourceEvent sourceEvent) { switchedSources.put(sse.sourceIndex(), sse.source()); setCurrentReader(sse.sourceIndex()); isFinalSource = sse.isFinalSource(); - if (!availabilityFuture.isDone()) { - // continue polling - availabilityFuture.complete(null); - } } else { currentReader.handleSourceEvents(sourceEvent); } @@ -231,16 +227,7 @@ private void setCurrentReader(int index) { reader.start(); currentSourceIndex = index; currentReader = reader; - currentReader - .isAvailable() - .whenComplete( - (result, ex) -> { - if (ex == null) { - availabilityFuture.complete(result); - } else { - availabilityFuture.completeExceptionally(ex); - } - }); + availabilityFuture.complete(null); LOG.debug( "Reader started: subtask={} sourceIndex={} {}", readerContext.getIndexOfSubtask(), diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index b9c2f5661156f..02231378a3678 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -24,7 +24,14 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader; +import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.core.io.InputStatus; @@ -35,6 +42,8 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -120,6 +129,120 @@ public SourceReader createReader( reader.close(); } + @Test + public void testAvailabilityFutureSwitchover() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + // 2 underlying readers to exercise switch + MutableFutureSourceReader mockSplitReader1 = + MutableFutureSourceReader.createReader(readerContext); + MutableFutureSourceReader mockSplitReader2 = + MutableFutureSourceReader.createReader(readerContext); + + HybridSourceReader reader = new HybridSourceReader<>(readerContext); + + assertThat(readerContext.getSentEvents()).isEmpty(); + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + assertThat(currentReader(reader)).isNull(); + assertThat(reader.pollNext(readerOutput)).isEqualTo(InputStatus.NOTHING_AVAILABLE); + CompletableFuture hybridSourceFutureBeforeFirstReader = reader.isAvailable(); + assertThat(hybridSourceFutureBeforeFirstReader).isNotDone(); + + Source source1 = + new MockSource(null, 0) { + @Override + public SourceReader createReader( + SourceReaderContext readerContext) { + return mockSplitReader1; + } + }; + reader.handleSourceEvents(new SwitchSourceEvent(0, source1, false)); + assertThat(hybridSourceFutureBeforeFirstReader) + .isDone() + .as("the previous underlying future should be completed after switch event"); + + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1); + mockSplit.addRecord(0); + + SwitchedSources switchedSources = new SwitchedSources(); + switchedSources.put(0, source); + HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources); + reader.addSplits(Collections.singletonList(hybridSplit)); + + // drain splits + CompletableFuture futureBeforeDraining = reader.isAvailable(); + mockSplitReader1.completeFuture(); + assertThat(futureBeforeDraining) + .isDone() + .as("underlying future is complete and hybrid source should poll"); + + InputStatus status = reader.pollNext(readerOutput); + while (readerOutput.getEmittedRecords().isEmpty() || status == InputStatus.MORE_AVAILABLE) { + status = reader.pollNext(readerOutput); + Thread.sleep(10); + } + // mock reader no more records + mockSplitReader1.resetFuture(); + + CompletableFuture futureAfterDraining = reader.isAvailable(); + assertThat(futureBeforeDraining) + .isNotEqualTo(futureAfterDraining) + .as("Future should have been refreshed since the previous future is complete"); + assertThat(futureAfterDraining).isNotDone().as("Future should not be complete"); + + assertThat(readerOutput.getEmittedRecords()).contains(0); + reader.pollNext(readerOutput); + assertThat(reader.pollNext(readerOutput)) + .as("before notifyNoMoreSplits") + .isEqualTo(InputStatus.NOTHING_AVAILABLE); + + reader.notifyNoMoreSplits(); + reader.pollNext(readerOutput); + assertAndClearSourceReaderFinishedEvent(readerContext, 0); + + assertThat(futureAfterDraining) + .isNotDone() + .as("still no more records and runtime should not poll"); + + assertThat(currentReader(reader)) + .as("reader before switch source event") + .isEqualTo(mockSplitReader1); + + Source source2 = + new MockSource(null, 0) { + @Override + public SourceReader createReader( + SourceReaderContext readerContext) { + return mockSplitReader2; + } + }; + + reader.handleSourceEvents(new SwitchSourceEvent(1, source2, true)); + assertThat(futureAfterDraining) + .isDone() + .as("switching should signal completion to poll the new reader"); + CompletableFuture futureReader2 = reader.isAvailable(); + + // futures must be different + assertThat(futureBeforeDraining).isNotSameAs(futureReader2); + assertThat(currentReader(reader)) + .as("reader after switch source event") + .isEqualTo(mockSplitReader2); + + reader.notifyNoMoreSplits(); + assertThat(reader.pollNext(readerOutput)) + .as("reader 1 after notifyNoMoreSplits") + .isEqualTo(InputStatus.END_OF_INPUT); + assertThat(futureReader2) + .isSameAs(reader.isAvailable()) + .as("future should not have been refreshed"); + + reader.close(); + } + @Test public void testReaderRecovery() throws Exception { TestingReaderContext readerContext = new TestingReaderContext(); @@ -201,4 +324,47 @@ private static void assertAndClearSourceReaderFinishedEvent( .isEqualTo(sourceIndex); context.clearSentEvents(); } + + private static class MutableFutureSourceReader extends MockSourceReader { + + private CompletableFuture availabilityFuture = new CompletableFuture<>(); + + public MutableFutureSourceReader( + FutureCompletingBlockingQueue> elementsQueue, + Supplier> splitFetcherSupplier, + Configuration config, + SourceReaderContext context) { + super(elementsQueue, splitFetcherSupplier, config, context); + } + + public static MutableFutureSourceReader createReader(SourceReaderContext readerContext) { + FutureCompletingBlockingQueue> elementsQueue = + new FutureCompletingBlockingQueue<>(); + + Configuration config = new Configuration(); + config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2); + config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); + MockSplitReader.Builder builder = + MockSplitReader.newBuilder() + .setNumRecordsPerSplitPerFetch(2) + .setBlockingFetch(true); + return new MutableFutureSourceReader( + elementsQueue, builder::build, config, readerContext); + } + + @Override + public CompletableFuture isAvailable() { + return availabilityFuture; + } + + public void completeFuture() { + availabilityFuture.complete(null); + } + + public void resetFuture() { + if (this.availabilityFuture.isDone()) { + availabilityFuture = new CompletableFuture<>(); + } + } + } }