-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6733] Use Flink 1.6/1.7 prepareSnapshotPreBarrier to replace BufferedOutputManager #7940
Conversation
public void prepareSnapshotPreBarrier(long checkpointId) { | ||
// Finish the current bundle before the snapshot barrier is sent downstream | ||
// This give us a clean state before taking the actual snapshot | ||
invokeFinishBundle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should highlight that this is the new method used to finish the bundle before the snapshot barrier is emitted and snapshot(..)
gets called.
…fferedOutputManager For Flink version <= 1.5 the Flink Runner has to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. This leads to increased code complexity. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in DoFnOperator's prepareSnapshotPreBarrier. The 1.5/1.6/1.7 build setup allows us to make this change with as little code duplication as possible.
} | ||
|
||
/** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */ | ||
public static class FlinkOutputManager<OutputT> implements DoFnRunners.OutputManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OutputManager
could be simplified to remove any buffering.
Note that we got rid of |
Not sure I like duplicating DoFnOperator and then keeping multiple copies in sync. Can we defer this change until we remove 1.5.x support? |
There is only one copy. I didn't want to copy DoFnOperator but it proved hard to simplify it without a copy. The good thing is that we have tests in place for 1.5/1.6/1.7 which ensure that nothing breaks across these versions. In my opinion this change gives us more insensitive to drop 1.5 support. Realistically, it will be hard to remain on multiple Flink versions without code diversion. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
…OutputManager For Flink version <= 1.5 the Flink Runner had to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier. This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
…OutputManager For Flink version <= 1.5 the Flink Runner had to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier. This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
…OutputManager For Flink version <= 1.5 the Flink Runner had to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier. This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this changes the default from buffering during checkpointing to flushing before checkpointing while still retaining the option to use the previous buffering behavior via a pipeline option.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this changes the default from buffering during checkpointing to flushing before checkpointing while still retaining the option to use the previous buffering behavior via a pipeline option.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
* Support ZetaSQL DATE type as a Beam LogicalType * [BEAM-6733] Add pipeline option to flush bundle data before checkpointing We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: #7940 #9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing. * Remove all answer placeholder checks as they can be confusing at times for some learners * Update course in Stepik * [BEAM-10018] Fix timestamps in windowing kata In this Kata, the timestamp was calculated from time objects, and converted to a timestamp in the local timezone. Thus, the results of the test depended on the configuration of the local timezone in the running system. The tests were hardcoded with a timezone different to mine, and thus I always failed to pass this Kata. The changes in this commit change the type in Event to be a datetime, the timestamps are set in UTC, and the output in the tests is hardcoded in UTC too. This should ensure that the kata works regardless the timezone configured in the system running the kata. * [BEAM-10018] Kata failing due to failed parsing Parsing the timestamps as strings using fromisoformat was failing, and the Kata failed silently regardless the code written in the boxes. This change sets the same timestamps, with UTC timezone, without parsing strings. * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Remove unused import * Add missing dependency * Fix member variable name in Kata documentation * Fix placeholder location * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (#11692) [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 * Python3 fix - convert dict.keys() to list before indexing (#11733) * Updates google-apitools and httplib2 (#11726) * [BEAM-9964] Update CHANGES.md (#11743) Co-authored-by: Omar Ismail <[email protected]> * [BEAM-9577] Artifact v2 support for uber jars. (#11708) * Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar). * Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem. * Populate all SpannerIO batching parameters in display data. Add all the grouping/batching parameters in SpannerIO populateDisplayData(). * Fix capitalization, clarify descriptions * fix capitalization, clarify description Grouped * Refactor to extract single method for popuplating displayData * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * Resolve merge conflict * Update Python Katas on Stepik * Update Beam Katas Java on Stepik Co-authored-by: Yueyang Qiu <[email protected]> Co-authored-by: Maximilian Michels <[email protected]> Co-authored-by: Israel Herraiz <[email protected]> Co-authored-by: pawelpasterz <[email protected]> Co-authored-by: Chamikara Jayalath <[email protected]> Co-authored-by: tvalentyn <[email protected]> Co-authored-by: Pablo <[email protected]> Co-authored-by: omarismail94 <[email protected]> Co-authored-by: Omar Ismail <[email protected]> Co-authored-by: Andrew Pilloud <[email protected]> Co-authored-by: Robert Bradshaw <[email protected]> Co-authored-by: nielm <[email protected]> Co-authored-by: Brian Hulette <[email protected]> Co-authored-by: Brian Hulette <[email protected]>
…ting We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing.
* Support ZetaSQL DATE type as a Beam LogicalType * [BEAM-6733] Add pipeline option to flush bundle data before checkpointing We had a couple of PRs in which we wanted to remove the buffering of bundle output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of those because we weren't sure how the change would affect the checkpoint performance. As a better migration path, this introduces a pipeline option to change the default, buffering bundle output during checkpointing, to finishing the bundle and flushing all data before checkpointing. * Remove all answer placeholder checks as they can be confusing at times for some learners * Update course in Stepik * [BEAM-10018] Fix timestamps in windowing kata In this Kata, the timestamp was calculated from time objects, and converted to a timestamp in the local timezone. Thus, the results of the test depended on the configuration of the local timezone in the running system. The tests were hardcoded with a timezone different to mine, and thus I always failed to pass this Kata. The changes in this commit change the type in Event to be a datetime, the timestamps are set in UTC, and the output in the tests is hardcoded in UTC too. This should ensure that the kata works regardless the timezone configured in the system running the kata. * [BEAM-10018] Kata failing due to failed parsing Parsing the timestamps as strings using fromisoformat was failing, and the Kata failed silently regardless the code written in the boxes. This change sets the same timestamps, with UTC timezone, without parsing strings. * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Remove unused import * Add missing dependency * Fix member variable name in Kata documentation * Fix placeholder location * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (apache#11692) [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 * Python3 fix - convert dict.keys() to list before indexing (apache#11733) * Updates google-apitools and httplib2 (apache#11726) * [BEAM-9964] Update CHANGES.md (apache#11743) Co-authored-by: Omar Ismail <[email protected]> * [BEAM-9577] Artifact v2 support for uber jars. (apache#11708) * Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar). * Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem. * Populate all SpannerIO batching parameters in display data. Add all the grouping/batching parameters in SpannerIO populateDisplayData(). * Fix capitalization, clarify descriptions * fix capitalization, clarify description Grouped * Refactor to extract single method for popuplating displayData * Convert html task description to md for "Hello Beam" and "Core Transforms/Map" * Convert html task description to md for "Core Transforms" remaining lessons * Convert html task description to md for "Common Transforms" lessons * Convert html task description to md for remaining Python Katas lessons * Convert html task description to md for most of Java Katas lessons * Convert html task description to md for Java Katas "Common Transforms" lessons * Convert html task description to md for Java Katas "Core Transforms" lessons * Resolve merge conflict * Update Python Katas on Stepik * Update Beam Katas Java on Stepik Co-authored-by: Yueyang Qiu <[email protected]> Co-authored-by: Maximilian Michels <[email protected]> Co-authored-by: Israel Herraiz <[email protected]> Co-authored-by: pawelpasterz <[email protected]> Co-authored-by: Chamikara Jayalath <[email protected]> Co-authored-by: tvalentyn <[email protected]> Co-authored-by: Pablo <[email protected]> Co-authored-by: omarismail94 <[email protected]> Co-authored-by: Omar Ismail <[email protected]> Co-authored-by: Andrew Pilloud <[email protected]> Co-authored-by: Robert Bradshaw <[email protected]> Co-authored-by: nielm <[email protected]> Co-authored-by: Brian Hulette <[email protected]> Co-authored-by: Brian Hulette <[email protected]>
For Flink version <= 1.5 the Flink Runner has to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted. This
leads to a lot of code complexity.
Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in DoFnOperator's prepareSnapshotPreBarrier. The
1.5/1.6/1.7 build setup allows us to make this change with as little code
duplication as possible.
Post-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.