Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9624] Adds Convert to Accumulators operator for use in combiner lifting for streaming pipelines #11271

Merged
merged 8 commits into from
Apr 2, 2020

Conversation

acrites
Copy link
Contributor

@acrites acrites commented Mar 30, 2020

The Convert To Accumulators operation is defined here:

https://s.apache.org/beam-runner-api-combine-model#heading=h.h5697l1scd9x

It is used by streaming pipelines where we want to lift the combiner into the GBK, but don't want to also lift the combiner into a PGBK before the GBK. This PGBK can behave badly with certain triggers.

This PR also adds an implementation of Convert to Accumulators for the Go, Java, and Python SDKs.

R: @robertwb, @lukecwik, @lostluck

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a place where you could add a Python test that was similar to the Go/Java ones.

@udim Any suggestions?

@lukecwik
Copy link
Member

retest this please

@lukecwik
Copy link
Member

Java Precommit failed due to:

19:54:56 > Task :sdks:java:harness:compileTestJava
19:54:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java:350: error: method createRunnerForPTransform in interface PTransformRunnerFactory<T> cannot be applied to given types;
19:54:56         .createRunnerForPTransform(
19:54:56         ^
19:54:56   required: PipelineOptions,BeamFnDataClient,BeamFnStateClient,String,PTransform,Supplier<String>,Map<String,PCollection>,Map<String,Coder>,Map<String,WindowingStrategy>,PCollectionConsumerRegistry,PTransformFunctionRegistry,PTransformFunctionRegistry,Consumer<ThrowingRunnable>,BundleSplitListener,BundleFinalizer
19:54:56   found: PipelineOptions,<null>,<null>,String,PTransform,<null>,Map<Object,Object>,Map<Object,Object>,Map<Object,Object>,PCollectionConsumerRegistry,PTransformFunctionRegistry,PTransformFunctionRegistry,<null>,<null>
19:54:56   reason: actual and formal argument lists differ in length
19:54:56   where T is a type-variable:
19:54:56     T extends Object declared in interface PTransformRunnerFactory

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than a missing test that Luke's already suggested, LGTM for Go. The implementation matches the spec description.

@acrites
Copy link
Contributor Author

acrites commented Apr 1, 2020

I fixed the problem in :sdks:java:harness:compileTestJava. It turns out my branch was old and createRunnerForPTransform only had 14 parameters, but now it's 15.

@lukecwik
Copy link
Member

lukecwik commented Apr 1, 2020

retest this please

@lukecwik
Copy link
Member

lukecwik commented Apr 1, 2020

retest this please

1 similar comment
@lukecwik
Copy link
Member

lukecwik commented Apr 2, 2020

retest this please

@lukecwik lukecwik merged commit 9e01c5a into apache:master Apr 2, 2020
@udim
Copy link
Member

udim commented Apr 8, 2020

I didn't see a place where you could add a Python test that was similar to the Go/Java ones.

@udim Any suggestions?

My suggestion is to look at combiners_test.py and add the appropriate test there.

@acrites
Copy link
Contributor Author

acrites commented Apr 8, 2020

The main issue is that we need to be able to test one of the pieces of the CombinePerKey operation (ConvertToAccumulators) independently of the others like MergeAccumulators and Extract. In Go and Java there were already some test frameworks in place to do that, but in Python we can only test the entire thing.

@acrites acrites deleted the convert-to-accumulators branch February 5, 2024 20:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants