-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. #11670
Conversation
…d runner implemented transforms) have an environment id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved for Go.
Run Go Flink ValidatesRunner |
Run Go Flink ValidatesRunner |
Run Python PreCommit |
public class PipelineTrimmer { | ||
private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class); | ||
/** | ||
* TrivialNativeTransformExpander is used to replace transforms with known URNs with their native |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "Trivial" mean here? Are there "Native" transforms that are not "Trivial"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not currently but the "trivial" is to imply that you don't need anything other then payload on the transform itself and that you don't need to inspect the transform or any of its children when constructing the native transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "trivial" is meant to apply to the expander part.
@@ -375,6 +375,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string { | |||
payload := &pipepb.WindowIntoPayload{ | |||
WindowFn: makeWindowFn(edge.Edge.WindowFn), | |||
} | |||
transformEnvID = m.addDefaultEnv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be native as well, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, Window.into
requires execution of assignWindows
that is part of the windowing fn (which could be a well known window fn or a custom user window fn).
It could be lifted into the runner if it understands the windowing fn but that could break fusion since Window.into
may occur between ParDo
s and far away from GroupByKey
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. I read this as GBK by mistake. Can we do this in a common place and skip the two known runner implemented transforms similar to other SDKs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
if (knownUrns.contains( | ||
pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) { | ||
LOG.debug("Removing descendants of known PTransform {}" + ptransformId); | ||
// Skip over previously removed transforms from the original pipeline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should expand the comment here to describe why this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -375,6 +375,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string { | |||
payload := &pipepb.WindowIntoPayload{ | |||
WindowFn: makeWindowFn(edge.Edge.WindowFn), | |||
} | |||
transformEnvID = m.addDefaultEnv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. I read this as GBK by mistake. Can we do this in a common place and skip the two known runner implemented transforms similar to other SDKs ?
@@ -123,20 +123,16 @@ class Pipeline(object): | |||
should be used to designate new names | |||
(e.g. ``input | "label" >> my_transform``). | |||
""" | |||
|
|||
# TODO: BEAM-9001 - set environment ID in all transforms and allow runners to | |||
# override. | |||
@classmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an assert similar to PipelineValidator.java ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you suggest the assert go?
The PipelineValidator does the assertion since it is used by the Runner once the entire pipeline is constructed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we can introduce a new visitor and update runners to use that similar to following ?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L557
But this is not a blocker.
Run Java PreCommit |
Run Python PreCommit |
1 similar comment
Run Python PreCommit |
@@ -213,6 +213,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string { | |||
transform := &pipepb.PTransform{ | |||
UniqueName: s.Scope.Name, | |||
Subtransforms: subtransforms, | |||
EnvironmentId: m.addDefaultEnv(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it's now the case that Composite Transforms should have environments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
It was to cover the case where the runner knows what the composite transform means such as a combiner and lifts it appropriately.
Run Python PreCommit |
Is this ready to be merged? This is the only blocker left on the release. This change is large for a cherry pick. What would be impacted, what tests need to be run? If we cherry pick this to the release branch there will be a risk of introducing other issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
@@ -123,20 +123,16 @@ class Pipeline(object): | |||
should be used to designate new names | |||
(e.g. ``input | "label" >> my_transform``). | |||
""" | |||
|
|||
# TODO: BEAM-9001 - set environment ID in all transforms and allow runners to | |||
# override. | |||
@classmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we can introduce a new visitor and update runners to use that similar to following ?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L557
But this is not a blocker.
We only would need to cherry pick the Python portion of this change which is one file. The Java/Go portion aren't important for 2.21. |
Thank you! |
…d runner implemented transforms) have an environment id. (apache#11670) * [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. * fixup! Fix native transform expander to not reinsert deleted transforms. * fixup! Address chamikara's PR comments
…d runner implemented transforms) have an environment id. (apache#11670) * [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. * fixup! Fix native transform expander to not reinsert deleted transforms. * fixup! Address chamikara's PR comments
…d runner implemented transforms) have an environment id. (apache#11670) * [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. * fixup! Fix native transform expander to not reinsert deleted transforms. * fixup! Address chamikara's PR comments
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.