Skip to content

Commit

Permalink
[FLINK-20194] Change SourceReaderBase.onSplitFinished() to take a map…
Browse files Browse the repository at this point in the history
… of SplitId -> SplitState.
  • Loading branch information
becketqin committed Nov 24, 2020
1 parent 1a9212c commit 2e2f637
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -172,11 +171,12 @@ private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final Reader
final Set<String> finishedSplits = fetch.finishedSplits();
if (!finishedSplits.isEmpty()) {
LOG.info("Finished reading split(s) {}", finishedSplits);
Map<String, SplitStateT> stateOfFinishedSplits = new HashMap<>();
for (String finishedSplitId : finishedSplits) {
splitStates.remove(finishedSplitId);
stateOfFinishedSplits.put(finishedSplitId, splitStates.remove(finishedSplitId).state);
output.releaseOutputForSplit(finishedSplitId);
}
onSplitFinished(finishedSplits);
onSplitFinished(stateOfFinishedSplits);
}

fetch.recycle();
Expand Down Expand Up @@ -251,7 +251,7 @@ public int getNumberOfCurrentlyAssignedSplits() {
/**
* Handles the finished splits to clean the state if needed.
*/
protected abstract void onSplitFinished(Collection<String> finishedSplitIds);
protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds);

/**
* When new splits are added to the reader. The initialize the state of the new splits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand Down Expand Up @@ -296,7 +296,7 @@ private Configuration getConfig() {
public void notifyCheckpointComplete(long checkpointId) {}

@Override
protected void onSplitFinished(Collection<String> finishedSplitIds) {}
protected void onSplitFinished(Map<String, TestingSourceSplit> finishedSplitIds) {}

@Override
protected TestingSourceSplit initializedState(TestingSourceSplit split) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

Expand All @@ -52,7 +52,7 @@ public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>
}

@Override
protected void onSplitFinished(Collection<String> finishedSplitIds) {
protected void onSplitFinished(Map<String, AtomicInteger> finishedSplitIds) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordAndPosition;

import java.util.Collection;
import java.util.Map;

/**
* A {@link SourceReader} that read records from {@link FileSourceSplit}.
Expand All @@ -54,7 +54,7 @@ public void start() {
}

@Override
protected void onSplitFinished(Collection<String> finishedSplitIds) {
protected void onSplitFinished(Map<String, FileSourceSplitState<SplitT>> finishedSplitIds) {
context.sendSplitRequest();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,7 +66,7 @@ public KafkaSourceReader(
}

@Override
protected void onSplitFinished(Collection<String> finishedSplitIds) {
protected void onSplitFinished(Map<String, KafkaPartitionSplitState> finishedSplitIds) {

}

Expand Down

0 comments on commit 2e2f637

Please sign in to comment.