Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6733] Use Flink 1.6/1.7 prepareSnapshotPreBarrier to replace BufferedOutputManager #7940

Closed
wants to merge 1 commit into from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Feb 25, 2019

For Flink version <= 1.5 the Flink Runner has to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted. This
leads to a lot of code complexity.

Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in DoFnOperator's prepareSnapshotPreBarrier. The
1.5/1.6/1.7 build setup allows us to make this change with as little code
duplication as possible.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@mxm
Copy link
Contributor Author

mxm commented Feb 25, 2019

R @tweise
CC @aljoscha

@mxm mxm requested a review from tweise February 25, 2019 17:55
public void prepareSnapshotPreBarrier(long checkpointId) {
// Finish the current bundle before the snapshot barrier is sent downstream
// This give us a clean state before taking the actual snapshot
invokeFinishBundle();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should highlight that this is the new method used to finish the bundle before the snapshot barrier is emitted and snapshot(..) gets called.

…fferedOutputManager

For Flink version <= 1.5 the Flink Runner has to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted. This
leads to increased code complexity.

Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in DoFnOperator's prepareSnapshotPreBarrier. The
1.5/1.6/1.7 build setup allows us to make this change with as little code
duplication as possible.
}

/** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */
public static class FlinkOutputManager<OutputT> implements DoFnRunners.OutputManager {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OutputManager could be simplified to remove any buffering.

@mxm
Copy link
Contributor Author

mxm commented Feb 25, 2019

Note that we got rid of FlinkSplitStateInternals and FlinkSplitStateInternalsTest entirely for Flink >= 1.6.

@tweise
Copy link
Contributor

tweise commented Feb 28, 2019

Not sure I like duplicating DoFnOperator and then keeping multiple copies in sync. Can we defer this change until we remove 1.5.x support?

@mxm
Copy link
Contributor Author

mxm commented Feb 28, 2019

There is only one copy. I didn't want to copy DoFnOperator but it proved hard to simplify it without a copy. The good thing is that we have tests in place for 1.5/1.6/1.7 which ensure that nothing breaks across these versions.

In my opinion this change gives us more insensitive to drop 1.5 support. Realistically, it will be hard to remain on multiple Flink versions without code diversion.

@stale
Copy link

stale bot commented Apr 29, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@stale stale bot added the stale label Apr 29, 2019
@stale
Copy link

stale bot commented May 6, 2019

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this May 6, 2019
mxm added a commit to mxm/beam that referenced this pull request Sep 24, 2019
…OutputManager

For Flink version <= 1.5 the Flink Runner had to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted.

Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier.

This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
mxm added a commit to lyft/beam that referenced this pull request Nov 1, 2019
…OutputManager

For Flink version <= 1.5 the Flink Runner had to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted.

Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier.

This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
mxm added a commit to lyft/beam that referenced this pull request Nov 1, 2019
…OutputManager

For Flink version <= 1.5 the Flink Runner had to buffer any elements which are
emitted during a snapshot because the barrier has already been emitted.

Flink version >= 1.6 provides a hook to execute an action before the snapshot
barrier is emitted by the operator. We can remove the buffering in favor of
finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier.

This had previously been deferred (apache#7940) until removal of Flink 1.5 (apache#9632).
mxm added a commit to mxm/beam that referenced this pull request May 12, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this changes the default from buffering during
checkpointing to flushing before checkpointing while still retaining the option
to use the previous buffering behavior via a pipeline option.
mxm added a commit to mxm/beam that referenced this pull request May 15, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this changes the default from buffering during
checkpointing to flushing before checkpointing while still retaining the option
to use the previous buffering behavior via a pipeline option.
mxm added a commit to mxm/beam that referenced this pull request May 15, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.
mxm added a commit to mxm/beam that referenced this pull request May 15, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.
mxm added a commit to mxm/beam that referenced this pull request May 15, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.
pabloem added a commit that referenced this pull request May 19, 2020
* Support ZetaSQL DATE type as a Beam LogicalType

* [BEAM-6733] Add pipeline option to flush bundle data before checkpointing

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: #7940 #9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.

* Remove all answer placeholder checks as they can be confusing at times for some learners

* Update course in Stepik

* [BEAM-10018] Fix timestamps in windowing kata

In this Kata, the timestamp was calculated from time objects, and converted to a
timestamp in the local timezone. Thus, the results of the test depended on the
configuration of the local timezone in the running system.

The tests were hardcoded with a timezone different to mine, and thus I always
failed to pass this Kata. The changes in this commit change the type in Event to
be a datetime, the timestamps are set in UTC, and the output in the tests is
hardcoded in UTC too. This should ensure that the kata works regardless the
timezone configured in the system running the kata.

* [BEAM-10018] Kata failing due to failed parsing

Parsing the timestamps as strings using fromisoformat was failing, and the Kata
failed silently regardless the code written in the boxes.

This change sets the same timestamps, with UTC timezone, without parsing
strings.

* Convert html task description to md for "Hello Beam" and "Core Transforms/Map"

* Remove unused import

* Add missing dependency

* Fix member variable name in Kata documentation

* Fix placeholder location

* Convert html task description to md for "Core Transforms" remaining lessons

* Convert html task description to md for "Common Transforms" lessons

* Convert html task description to md for remaining Python Katas lessons

* Convert html task description to md for most of Java Katas lessons

* Convert html task description to md for Java Katas "Common Transforms" lessons

* Convert html task description to md for Java Katas "Core Transforms" lessons

* [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (#11692)

[BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11

* Python3 fix - convert dict.keys() to list before indexing (#11733)

* Updates google-apitools and httplib2 (#11726)

* [BEAM-9964] Update CHANGES.md  (#11743)

Co-authored-by: Omar Ismail <[email protected]>

* [BEAM-9577] Artifact v2 support for uber jars. (#11708)

* Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar).
* Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem.

* Populate all SpannerIO batching parameters in display data.

Add all the grouping/batching parameters in SpannerIO
populateDisplayData().

* Fix capitalization, clarify descriptions

* fix capitalization, clarify description Grouped

* Refactor to extract single method for popuplating displayData

* Convert html task description to md for "Hello Beam" and "Core Transforms/Map"

* Convert html task description to md for "Core Transforms" remaining lessons

* Convert html task description to md for "Common Transforms" lessons

* Convert html task description to md for remaining Python Katas lessons

* Convert html task description to md for most of Java Katas lessons

* Convert html task description to md for Java Katas "Common Transforms" lessons

* Convert html task description to md for Java Katas "Core Transforms" lessons

* Resolve merge conflict

* Update Python Katas on Stepik

* Update Beam Katas Java on Stepik

Co-authored-by: Yueyang Qiu <[email protected]>
Co-authored-by: Maximilian Michels <[email protected]>
Co-authored-by: Israel Herraiz <[email protected]>
Co-authored-by: pawelpasterz <[email protected]>
Co-authored-by: Chamikara Jayalath <[email protected]>
Co-authored-by: tvalentyn <[email protected]>
Co-authored-by: Pablo <[email protected]>
Co-authored-by: omarismail94 <[email protected]>
Co-authored-by: Omar Ismail <[email protected]>
Co-authored-by: Andrew Pilloud <[email protected]>
Co-authored-by: Robert Bradshaw <[email protected]>
Co-authored-by: nielm <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
…ting

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.
yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
* Support ZetaSQL DATE type as a Beam LogicalType

* [BEAM-6733] Add pipeline option to flush bundle data before checkpointing

We had a couple of PRs in which we wanted to remove the buffering of bundle
output during checkpointing: apache#7940 apache#9652. Ultimately, we didn't merge any of
those because we weren't sure how the change would affect the checkpoint
performance.

As a better migration path, this introduces a pipeline option to change the
default, buffering bundle output during checkpointing, to finishing the bundle
and flushing all data before checkpointing.

* Remove all answer placeholder checks as they can be confusing at times for some learners

* Update course in Stepik

* [BEAM-10018] Fix timestamps in windowing kata

In this Kata, the timestamp was calculated from time objects, and converted to a
timestamp in the local timezone. Thus, the results of the test depended on the
configuration of the local timezone in the running system.

The tests were hardcoded with a timezone different to mine, and thus I always
failed to pass this Kata. The changes in this commit change the type in Event to
be a datetime, the timestamps are set in UTC, and the output in the tests is
hardcoded in UTC too. This should ensure that the kata works regardless the
timezone configured in the system running the kata.

* [BEAM-10018] Kata failing due to failed parsing

Parsing the timestamps as strings using fromisoformat was failing, and the Kata
failed silently regardless the code written in the boxes.

This change sets the same timestamps, with UTC timezone, without parsing
strings.

* Convert html task description to md for "Hello Beam" and "Core Transforms/Map"

* Remove unused import

* Add missing dependency

* Fix member variable name in Kata documentation

* Fix placeholder location

* Convert html task description to md for "Core Transforms" remaining lessons

* Convert html task description to md for "Common Transforms" lessons

* Convert html task description to md for remaining Python Katas lessons

* Convert html task description to md for most of Java Katas lessons

* Convert html task description to md for Java Katas "Common Transforms" lessons

* Convert html task description to md for Java Katas "Core Transforms" lessons

* [BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11 (apache#11692)

[BEAM-2530] Implement Zeta SQL precommit compile tests and run on java 11

* Python3 fix - convert dict.keys() to list before indexing (apache#11733)

* Updates google-apitools and httplib2 (apache#11726)

* [BEAM-9964] Update CHANGES.md  (apache#11743)

Co-authored-by: Omar Ismail <[email protected]>

* [BEAM-9577] Artifact v2 support for uber jars. (apache#11708)

* Adds a "filesystem" for artifacts placed on the classpath (e.g. within the uberjar).
* Updates the flink and spark uberjars to use artifact staging v2, leveraging the above filesystem.

* Populate all SpannerIO batching parameters in display data.

Add all the grouping/batching parameters in SpannerIO
populateDisplayData().

* Fix capitalization, clarify descriptions

* fix capitalization, clarify description Grouped

* Refactor to extract single method for popuplating displayData

* Convert html task description to md for "Hello Beam" and "Core Transforms/Map"

* Convert html task description to md for "Core Transforms" remaining lessons

* Convert html task description to md for "Common Transforms" lessons

* Convert html task description to md for remaining Python Katas lessons

* Convert html task description to md for most of Java Katas lessons

* Convert html task description to md for Java Katas "Common Transforms" lessons

* Convert html task description to md for Java Katas "Core Transforms" lessons

* Resolve merge conflict

* Update Python Katas on Stepik

* Update Beam Katas Java on Stepik

Co-authored-by: Yueyang Qiu <[email protected]>
Co-authored-by: Maximilian Michels <[email protected]>
Co-authored-by: Israel Herraiz <[email protected]>
Co-authored-by: pawelpasterz <[email protected]>
Co-authored-by: Chamikara Jayalath <[email protected]>
Co-authored-by: tvalentyn <[email protected]>
Co-authored-by: Pablo <[email protected]>
Co-authored-by: omarismail94 <[email protected]>
Co-authored-by: Omar Ismail <[email protected]>
Co-authored-by: Andrew Pilloud <[email protected]>
Co-authored-by: Robert Bradshaw <[email protected]>
Co-authored-by: nielm <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants