-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. #11514
Conversation
…ing to fix them up.
inject_pcollection(mi) | ||
|
||
return infos_list | ||
pcollection_ids = self.process_bundle_descriptor.transforms[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the id order aligns with the receiver order since transform_consumers built above iterates the outputs map in the same order and this gets plumbed down through to Operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here?
Run Portable_Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this would be to finish the transition from reporting counters on PTransform outputs to recording them on the various PCollections.
LGTM if you can change to use a mapping of tags to pcoll ids rather than relying on ordering being the same.
(self.receivers, pcollection_ids)) | ||
|
||
all_monitoring_infos = {} | ||
for i in range(len(self.receivers)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will change if you use a mapping, but zip
would be the idiom to use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 using zip
inject_pcollection(mi) | ||
|
||
return infos_list | ||
pcollection_ids = self.process_bundle_descriptor.transforms[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here?
tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL] | ||
|
||
if not ptransform_label in self.process_bundle_descriptor.transforms: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a bug, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah
if not ptransform_label in self.process_bundle_descriptor.transforms: | ||
return | ||
if not tag_label in self.process_bundle_descriptor.transforms[ | ||
ptransform_label].outputs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this can happen, and might be what's happening here. There is no PCollection for this tag, but the user outputted a value to this tag. It would make sense to record this output even if we didn't use it. This is another downside of attaching these counters to PCollections themselves rather than to PTransform outputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unknown outputs should probably be reported another way
I'll try it out but I worry that the consumers/receivers are either using indices or the post string converted tag names since python does some post processing converting the string tags to non string tags. |
I looked through the implementation and it seems as though adding the pcollection id to the ConsumerSet doesn't work out since operations don't have that level of visibility in pipeline proto and consumers works off of a index -> receiver map and expects tags to get mapped to indices so we would need to go through all three layers. I suggest that we stick with this brittle approach until we can delete the non-portable Python worker implementation which would make a lot of the layers simpler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM. Can't wait 'till we can get rid of the legacy worker and clean this up!
Run Python PreCommit |
I am suspecting that this made precommits flaky |
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
…ing to fix them up. (apache#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.