Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beam-2535: Support timer output timestamps in the flink runner #10534

Merged
merged 3 commits into from
Jan 22, 2020

Conversation

reuvenlax
Copy link
Contributor

Adds the appropriate watermark holds in the flink runner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax reuvenlax requested a review from mxm January 8, 2020 20:39
@reuvenlax
Copy link
Contributor Author

R: xubii

@reuvenlax
Copy link
Contributor Author

Some context: pr/9677 allows users to set their own watermark holds, by tying it to the output timestamp of a timer (this prevents watermark hold from getting "leaked" as when the timer fires the holds are removed). This PR adds this support to the Flink runner as well, which was excluded from the previous PR.

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

8 similar comments
@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

@mxm this doesn't quite work. Presumably I'm misunderstanding something about how the Flink runner works. Do you have any thoughts on how this might be implemented?

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

Thanks @reuvenlax. I'll have a look later today.

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

Run Flink ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

Run Java Flink PortableValidatesRunner Streaming

@@ -1171,6 +1173,8 @@ private void cancelPendingTimerById(String contextTimerId) throws Exception {
void cleanupPendingTimer(TimerData timer) {
try {
pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
// Always update watermark hold after a timer has been fired or removed
updateWatermarkHold();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the missing piece. Without this, we can't make progress when a timer has fired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I didn't put a call here, because the way the code is structured that would result in two calls to updateWatermarkHold every time you set a timer. Would it be sufficient to add the call in fireTimer, as it appears that's that path we missed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. My first thought was to add it in fireTimer, but then I thought I could simplify it further. That led to the duplicate update call when you set the timer. Looks like we have just update the hold on set, delete, and fire, and not for cleanup.

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

Run Flink ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Jan 9, 2020

The portable ValidatesRunner tests are failing. We need to adapt the watermark handling there. Let me check.

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Apex ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Samza ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

This appears to work except for the portable runner

@mxm
Copy link
Contributor

mxm commented Jan 10, 2020

This is an issue with the portable TestStream implementation not working with non-standard coders. Let me see if I can find a fix.

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

1 similar comment
@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Jan 15, 2020

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

@mxm I believe this now supports both the old and the portable flink runners

@mxm
Copy link
Contributor

mxm commented Jan 16, 2020

Run Java PreCommit

@mxm
Copy link
Contributor

mxm commented Jan 16, 2020

Run JavaPortabilityApi PreCommit

@mxm
Copy link
Contributor

mxm commented Jan 16, 2020

There are still some test failures:
https://builds.apache.org/job/beam_PreCommit_Java_Phrase/1651/
https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Phrase/175/

After those are fixed please revert #10610 in this PR to reactivate the new timer output timestamp test.

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run JavaPortabilityApi PreCommit

1 similar comment
@reuvenlax
Copy link
Contributor Author

Run JavaPortabilityApi PreCommit

@reuvenlax
Copy link
Contributor Author

@mxm tests are now fixed. JavaPortabilityApi is failing consistently in pushDockerContainer, which seems unrelated.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Looks good! I agree that the test failures look unrelated.

@mxm
Copy link
Contributor

mxm commented Jan 22, 2020

Run Flink ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Jan 22, 2020

Run Java Flink PortableValidatesRunner Streaming

@mxm
Copy link
Contributor

mxm commented Jan 22, 2020

Run Java Flink PortableValidatesRunner Batch

@reuvenlax reuvenlax merged commit dd0b001 into apache:master Jan 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants