-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement Pub/Sub Lite's I/O using UnboundedSource. #22612
Reimplement Pub/Sub Lite's I/O using UnboundedSource. #22612
Conversation
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
R: @chamikaramj |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
...latform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
Show resolved
Hide resolved
...atform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
Show resolved
Hide resolved
if (!subscriber.state().equals(State.RUNNING)) { | ||
throw new IOException("Subscriber failed: ", subscriber.failureCause()); | ||
} | ||
if (advanced) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the subscriber already set to the first record the first time the start() is called ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is if the record is ready- this seems to be the correct behavior and match KafkaUnboundedReader?
...atform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #22612 +/- ##
==========================================
+ Coverage 74.19% 74.20% +0.01%
==========================================
Files 706 708 +2
Lines 93229 93462 +233
==========================================
+ Hits 69168 69355 +187
- Misses 22793 22832 +39
- Partials 1268 1275 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
Probably someone from Dataflow streaming team should take a quick look as well. @scwhittle will you be able to check ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnboundedSource implementation looks good to me but I'd prefer someone more familiar with Java to review as well
...cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ApiServices.java
Show resolved
Hide resolved
...form/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/BlockingCommitterImpl.java
Show resolved
Hide resolved
...form/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/BlockingCommitterImpl.java
Outdated
Show resolved
Hide resolved
...latform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
Show resolved
Hide resolved
...latform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
Show resolved
Hide resolved
...atform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
Show resolved
Hide resolved
...atform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
Show resolved
Hide resolved
...atform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
Show resolved
Hide resolved
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
Run PostCommit_Java_Dataflow |
Run PostCommit_Java_DataflowV2 |
Thanks Sam. I think we can merge this when tests pass. LGTM. Based on an offline discussion with Daniel, we'll be adding a transform override in a follow-up PR to keep SDF version the default while overriding with UnboundedSource-based version for production (Runner v1) Dataflow. |
Run Java_GCP_IO_Direct PreCommit |
Run PostCommit_Java_Dataflow |
Run PostCommit_Java_Dataflow |
Run PostCommit_Java_DataflowV2 |
Run Java_GCP_IO_Direct PreCommit |
Run PostCommit_Java_DataflowV2 |
Run PostCommit_Java_Dataflow |
Pls fix spotless. @@ -18,7 +18,6 @@ |
...latform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
Show resolved
Hide resolved
Run PostCommit_Java_DataflowV2 |
Run PostCommit_Java_Dataflow |
Done. |
SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.