-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. #11514
Changes from all commits
96cc611
808b4b9
04c6334
051c094
4c03658
1297a0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1035,59 +1035,12 @@ def monitoring_infos(self): | |
# Construct a new dict first to remove duplicates. | ||
all_monitoring_infos_dict = {} | ||
for transform_id, op in self.ops.items(): | ||
for mi in op.monitoring_infos(transform_id).values(): | ||
fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi) | ||
all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi | ||
|
||
infos_list = list(all_monitoring_infos_dict.values()) | ||
|
||
def inject_pcollection(monitoring_info): | ||
""" | ||
If provided metric is element count metric: | ||
Finds relevant transform output info in current process_bundle_descriptor | ||
and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring | ||
info. | ||
""" | ||
if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS: | ||
if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels: | ||
return | ||
ptransform_label = monitoring_info.labels[ | ||
monitoring_infos.PTRANSFORM_LABEL] | ||
if not monitoring_infos.TAG_LABEL in monitoring_info.labels: | ||
return | ||
tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL] | ||
|
||
if not ptransform_label in self.process_bundle_descriptor.transforms: | ||
return | ||
if not tag_label in self.process_bundle_descriptor.transforms[ | ||
ptransform_label].outputs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, this can happen, and might be what's happening here. There is no PCollection for this tag, but the user outputted a value to this tag. It would make sense to record this output even if we didn't use it. This is another downside of attaching these counters to PCollections themselves rather than to PTransform outputs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unknown outputs should probably be reported another way |
||
return | ||
|
||
pcollection_name = ( | ||
self.process_bundle_descriptor.transforms[ptransform_label]. | ||
outputs[tag_label]) | ||
|
||
monitoring_info.labels[ | ||
monitoring_infos.PCOLLECTION_LABEL] = pcollection_name | ||
|
||
# Cleaning up labels that are not in specification. | ||
monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL) | ||
monitoring_info.labels.pop(monitoring_infos.TAG_LABEL) | ||
|
||
for mi in infos_list: | ||
inject_pcollection(mi) | ||
|
||
return infos_list | ||
tag_to_pcollection_id = self.process_bundle_descriptor.transforms[ | ||
transform_id].outputs | ||
all_monitoring_infos_dict.update( | ||
op.monitoring_infos(transform_id, tag_to_pcollection_id)) | ||
|
||
def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info): | ||
# type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo | ||
actual_output_tags = list( | ||
self.process_bundle_descriptor.transforms[transform_id].outputs.keys()) | ||
if ('TAG' in monitoring_info.labels and | ||
monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'): | ||
if len(actual_output_tags) == 1: | ||
monitoring_info.labels['TAG'] = actual_output_tags[0] | ||
return monitoring_info | ||
return list(all_monitoring_infos_dict.values()) | ||
|
||
def shutdown(self): | ||
# type: () -> None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a bug, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah