Skip to content

Commit

Permalink
[BEAM-2939, BEAM-10057] Ensure that we can process an EmptyUnboundedS…
Browse files Browse the repository at this point in the history
…ource and also prevent splitting on it. (apache#11781)
  • Loading branch information
lukecwik committed May 26, 2020
1 parent f452e09 commit d86219f
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ public void close() throws IOException {}

@Override
public Instant getWatermark() {
throw new UnsupportedOperationException("getWatermark is never meant to be invoked.");
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
Expand Down Expand Up @@ -836,16 +836,20 @@ public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
@Override
public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(
double fractionOfRemainder) {
// Don't split if we have claimed all since the SDF wrapper will be finishing soon.
// Don't split if we have the empty sources since the SDF wrapper will be finishing soon.
UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
return null;
}

// Our split result sets the primary to have no checkpoint mark associated
// with it since when we resume we don't have any state but we specifically pass
// the checkpoint mark to the current reader so that when we finish the current bundle
// we may register for finalization.
UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result =
SplitResult.of(
UnboundedSourceRestriction.create(
EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()),
EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE),
currentRestriction);
currentReader =
EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
Expand Down

0 comments on commit d86219f

Please sign in to comment.