Skip to content

Commit

Permalink
[FLINK-21996][refactor] Make NumberSequenceSource extensible to allow…
Browse files Browse the repository at this point in the history
… specifying the number of desired sequence splits.
  • Loading branch information
StephanEwen committed Apr 14, 2021
1 parent ea4b391 commit d6253f4
Showing 1 changed file with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -81,6 +82,18 @@ public NumberSequenceSource(long from, long to) {
this.to = to;
}

public long getFrom() {
return from;
}

public long getTo() {
return to;
}

// ------------------------------------------------------------------------
// source methods
// ------------------------------------------------------------------------

@Override
public TypeInformation<Long> getProducedType() {
return Types.LONG;
Expand All @@ -100,19 +113,8 @@ public SourceReader<Long, NumberSequenceSplit> createReader(SourceReaderContext
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator(
final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {

final NumberSequenceIterator[] subSequences =
new NumberSequenceIterator(from, to).split(enumContext.currentParallelism());
final ArrayList<NumberSequenceSplit> splits = new ArrayList<>(subSequences.length);

int splitId = 1;
for (NumberSequenceIterator seq : subSequences) {
if (seq.hasNext()) {
splits.add(
new NumberSequenceSplit(
String.valueOf(splitId++), seq.getCurrent(), seq.getTo()));
}
}

final List<NumberSequenceSplit> splits =
splitNumberRange(from, to, enumContext.currentParallelism());
return new IteratorSourceEnumerator<>(enumContext, splits);
}

Expand All @@ -134,6 +136,23 @@ public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() {
return new CheckpointSerializer();
}

protected List<NumberSequenceSplit> splitNumberRange(long from, long to, int numSplits) {
final NumberSequenceIterator[] subSequences =
new NumberSequenceIterator(from, to).split(numSplits);
final ArrayList<NumberSequenceSplit> splits = new ArrayList<>(subSequences.length);

int splitId = 1;
for (NumberSequenceIterator seq : subSequences) {
if (seq.hasNext()) {
splits.add(
new NumberSequenceSplit(
String.valueOf(splitId++), seq.getCurrent(), seq.getTo()));
}
}

return splits;
}

// ------------------------------------------------------------------------
// splits & checkpoint
// ------------------------------------------------------------------------
Expand Down

0 comments on commit d6253f4

Please sign in to comment.