Skip to content

Commit

Permalink
[FLINK-19698][connector/common] Add a close() method to the SplitReader.
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin committed Nov 5, 2020
1 parent a8ad3e3 commit 7ea3a60
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
private FetchTask<E, SplitT> fetchTask;
private final FetchTask<E, SplitT> fetchTask;
private volatile SplitFetcherTask runningTask = null;

/** Flag whether this fetcher has no work assigned at the moment.
Expand Down Expand Up @@ -91,8 +92,13 @@ public void run() {
runOnce();
}
} finally {
shutdownHook.run();
LOG.info("Split fetcher {} exited.", id);
shutdownHook.run();
try {
splitReader.close();
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
* {@link #fetch()}.
*/
void wakeUp();

/**
* Close the split reader.
*
* @throws Exception if closing the split reader failed.
*/
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ public RecordsWithSplitIds<int[]> fetch() {
public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}

@Override
public void wakeUp() {
}
public void wakeUp() {}

@Override
public void close() throws Exception {}
},
getConfig(),
null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.OneShotLatch;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -35,6 +37,7 @@
import java.util.Collections;
import java.util.Queue;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
Expand All @@ -57,6 +60,22 @@ public void testExceptionPropagationSuccessiveFetch() throws Exception {
);
}

@Test
public void testCloseFetcherWithException() throws Exception {
TestingSplitReader<Object, TestingSourceSplit> reader = new TestingSplitReader<>();
reader.setCloseWithException();
SplitFetcherManager<Object, TestingSourceSplit> fetcherManager =
createFetcher("test-split", new FutureCompletingBlockingQueue<>(), reader);
fetcherManager.close(1000L);
try {
fetcherManager.checkErrors();
} catch (Exception e) {
assertEquals(
"Artificial exception on closing the split reader.",
ExceptionUtils.getRootCause(e).getMessage());
}
}

// the final modifier is important so that '@SafeVarargs' is accepted on Java 8
@SuppressWarnings("FinalPrivateMethod")
@SafeVarargs
Expand Down Expand Up @@ -146,6 +165,9 @@ public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
@Override
public void wakeUp() {}

@Override
public void close() throws Exception {}

public void awaitAllRecordsReturned() throws InterruptedException {
inBlocking.await();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ public void run() {
}
}

@Test
public void testClose() {
TestingSplitReader<Object, TestingSourceSplit> splitReader = new TestingSplitReader<>();
final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(splitReader);
fetcher.shutdown();
fetcher.run();
assertTrue(splitReader.isClosed());
}

// ------------------------------------------------------------------------
// testing utils
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public void wakeUp() {
}
}

@Override
public void close() throws Exception {}

private RecordsBySplits<int[]> getRecords() {
final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> {

private final ArrayDeque<RecordsWithSplitIds<E>> fetches;
private volatile boolean closed;
private volatile boolean closeWithException;

@SafeVarargs
public TestingSplitReader(RecordsWithSplitIds<E>... fetches) {
this.fetches = new ArrayDeque<>(fetches.length);
this.fetches.addAll(Arrays.asList(fetches));
this.closed = false;
this.closeWithException = false;
}

@Override
Expand Down Expand Up @@ -66,4 +70,20 @@ public void wakeUp() {
fetches.notifyAll();
}
}

@Override
public void close() throws Exception {
if (closeWithException) {
throw new Exception("Artificial exception on closing the split reader.");
}
closed = true;
}

public void setCloseWithException() {
closeWithException = true;
}

public boolean isClosed() {
return closed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public void handleSplitsChanges(final SplitsChange<SplitT> splitChange) {
@Override
public void wakeUp() {}

@Override
public void close() throws Exception {}

private void checkSplitOrStartNext() throws IOException {
if (currentReader != null) {
return;
Expand Down

0 comments on commit 7ea3a60

Please sign in to comment.