Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Pub/Sub Lite's I/O using UnboundedSource. #22612

Merged
merged 8 commits into from
Aug 10, 2022
Prev Previous commit
Next Next commit
Reimplement Pub/Sub Lite's I/O using UnboundedSource.
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
  • Loading branch information
dpcollins-google committed Aug 8, 2022
commit 6ccf85deda277e9aa9d9af3a63775c17569a6514
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,18 @@ public boolean advance() throws IOException {
throw new IOException("Subscriber failed: ", subscriber.failureCause());
}
if (advanced) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the subscriber already set to the first record the first time the start() is called ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is if the record is ready- this seems to be the correct behavior and match KafkaUnboundedReader?

SequencedMessage peeked = subscriber.peek().get();
Offset nextOffset = Offset.of(peeked.getCursor().getOffset() + 1);
checkState(nextOffset.value() > fetchOffset.value());
fetchOffset = nextOffset;
lastMessageTimestamp = Optional.of(getTimestamp(peeked));
subscriber.pop();
}
advanced = subscriber.peek().isPresent();
return advanced;
Optional<SequencedMessage> next = subscriber.peek();
advanced = next.isPresent();
if (!advanced) {
return false;
}
Offset nextOffset = Offset.of(next.get().getCursor().getOffset() + 1);
checkState(nextOffset.value() > fetchOffset.value());
fetchOffset = nextOffset;
lastMessageTimestamp = Optional.of(getTimestamp(next.get()));
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,14 @@ public void setUp() {

@Test
public void startAdvances() throws Exception {
SequencedMessage message = messageWith("abc", 2, Instant.now());
Instant ts = Instant.now();
SequencedMessage message = messageWith("abc", 2, ts);
doReturn(Optional.of(message)).when(subscriber).peek();
assertTrue(reader.start());
verify(subscriber).startAsync();
verify(subscriber).awaitRunning(1, TimeUnit.MINUTES);
assertEquals(reader.getCurrent(), message);
assertEquals(reader.getWatermark(), ts);
}

@Test
Expand All @@ -120,6 +122,7 @@ public void startAdvancesNoMessage() throws Exception {
verify(subscriber).startAsync();
verify(subscriber).awaitRunning(1, TimeUnit.MINUTES);
assertThrows(NoSuchElementException.class, reader::getCurrent);
assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, reader.getWatermark());
}

@Test
Expand All @@ -142,7 +145,7 @@ public void advanceWithPreviousValue() throws Exception {
assertTrue(reader.advance());
assertEquals(reader.getCurrent(), message1);
assertEquals(reader.getCurrentTimestamp(), ts1);
assertEquals(reader.getWatermark(), BoundedWindow.TIMESTAMP_MIN_VALUE);
assertEquals(reader.getWatermark(), ts1);
doAnswer(
(Answer<Void>)
args -> {
Expand Down