-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Beam-2535: Support timer output timestamps in the flink runner #10534
Conversation
Run Flink ValidatesRunner |
R: xubii |
Some context: pr/9677 allows users to set their own watermark holds, by tying it to the output timestamp of a timer (this prevents watermark hold from getting "leaked" as when the timer fires the holds are removed). This PR adds this support to the Flink runner as well, which was excluded from the previous PR. |
Run Flink ValidatesRunner |
8 similar comments
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
Run Flink ValidatesRunner |
@mxm this doesn't quite work. Presumably I'm misunderstanding something about how the Flink runner works. Do you have any thoughts on how this might be implemented? |
Thanks @reuvenlax. I'll have a look later today. |
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
@@ -1171,6 +1173,8 @@ private void cancelPendingTimerById(String contextTimerId) throws Exception { | |||
void cleanupPendingTimer(TimerData timer) { | |||
try { | |||
pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace())); | |||
// Always update watermark hold after a timer has been fired or removed | |||
updateWatermarkHold(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the missing piece. Without this, we can't make progress when a timer has fired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I didn't put a call here, because the way the code is structured that would result in two calls to updateWatermarkHold every time you set a timer. Would it be sufficient to add the call in fireTimer, as it appears that's that path we missed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. My first thought was to add it in fireTimer
, but then I thought I could simplify it further. That led to the duplicate update call when you set the timer. Looks like we have just update the hold on set
, delete
, and fire
, and not for cleanup
.
Unrelated test failure in https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_PR/170/ |
Run Flink ValidatesRunner |
The portable ValidatesRunner tests are failing. We need to adapt the watermark handling there. Let me check. |
609c9c4
to
657f4f9
Compare
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
Run Flink ValidatesRunner |
Run Apex ValidatesRunner |
Run Samza ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
This appears to work except for the portable runner |
This is an issue with the portable TestStream implementation not working with non-standard coders. Let me see if I can find a fix. |
Run Java Flink PortableValidatesRunner Streaming |
2b817d5
to
fb686f2
Compare
Run Java Flink PortableValidatesRunner Streaming |
fb686f2
to
806be60
Compare
Run Java Flink PortableValidatesRunner Streaming |
1 similar comment
Run Java Flink PortableValidatesRunner Streaming |
Run Flink ValidatesRunner |
Run Java PreCommit |
5d58c2e
to
df68f73
Compare
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
@mxm I believe this now supports both the old and the portable flink runners |
Run Java PreCommit |
Run JavaPortabilityApi PreCommit |
There are still some test failures: After those are fixed please revert #10610 in this PR to reactivate the new timer output timestamp test. |
6ed6448
to
ec8aa24
Compare
Run Java PreCommit |
Run JavaPortabilityApi PreCommit |
1 similar comment
Run JavaPortabilityApi PreCommit |
@mxm tests are now fixed. JavaPortabilityApi is failing consistently in pushDockerContainer, which seems unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Looks good! I agree that the test failures look unrelated.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Outdated
Show resolved
Hide resolved
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
ac4dba3
to
15ecdc2
Compare
Adds the appropriate watermark holds in the flink runner