From 3e62a5ba2f9ed7451d7c0ade81f19488718ffc8d Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Sat, 25 Apr 2020 21:52:20 -0700 Subject: [PATCH] [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514) * [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. * fixup! Convert to list since values() isn't subscriptable * fixup! Use zip * fixup! Migrate to use tag -> pcollection id * fixup! lint * fixup! Fix comparison --- .../apache_beam/metrics/monitoring_infos.py | 60 +++++----- .../runners/worker/bundle_processor.py | 57 +-------- .../apache_beam/runners/worker/operations.pxd | 4 +- .../apache_beam/runners/worker/operations.py | 110 ++++++++++-------- 4 files changed, 94 insertions(+), 137 deletions(-) diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 2d1524cef33e3..92eb9decdd24b 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -78,7 +78,6 @@ NAMESPACE_LABEL = ( common_urns.monitoring_info_labels.NAMESPACE.label_props.name) NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name) -TAG_LABEL = "TAG" def extract_counter_value(monitoring_info_proto): @@ -113,26 +112,26 @@ def extract_distribution(monitoring_info_proto): coders.VarIntCoder(), monitoring_info_proto.payload) -def create_labels(ptransform=None, tag=None, namespace=None, name=None): - """Create the label dictionary based on the provided tags. +def create_labels(ptransform=None, namespace=None, name=None, pcollection=None): + """Create the label dictionary based on the provided values. Args: - ptransform: The ptransform/step name. - tag: he output tag name, used as a label. + ptransform: The ptransform id used as a label. + pcollection: The pcollection id used as a label. """ labels = {} - if tag: - labels[TAG_LABEL] = tag if ptransform: labels[PTRANSFORM_LABEL] = ptransform if namespace: labels[NAMESPACE_LABEL] = namespace if name: labels[NAME_LABEL] = name + if pcollection: + labels[PCOLLECTION_LABEL] = pcollection return labels -def int64_user_counter(namespace, name, metric, ptransform=None, tag=None): +def int64_user_counter(namespace, name, metric, ptransform=None): # type: (...) -> metrics_pb2.MonitoringInfo """Return the counter monitoring info for the specifed URN, metric and labels. @@ -140,18 +139,16 @@ def int64_user_counter(namespace, name, metric, ptransform=None, tag=None): Args: urn: The URN of the monitoring info/metric. metric: The payload field to use in the monitoring info or an int value. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. """ - labels = create_labels( - ptransform=ptransform, tag=tag, namespace=namespace, name=name) + labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) if isinstance(metric, int): metric = coders.VarIntCoder().encode(metric) return create_monitoring_info( USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels) -def int64_counter(urn, metric, ptransform=None, tag=None): +def int64_counter(urn, metric, ptransform=None, pcollection=None): # type: (...) -> metrics_pb2.MonitoringInfo """Return the counter monitoring info for the specifed URN, metric and labels. @@ -159,33 +156,31 @@ def int64_counter(urn, metric, ptransform=None, tag=None): Args: urn: The URN of the monitoring info/metric. metric: The payload field to use in the monitoring info or an int value. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. + pcollection: The pcollection id used as a label. """ - labels = create_labels(ptransform=ptransform, tag=tag) + labels = create_labels(ptransform=ptransform, pcollection=pcollection) if isinstance(metric, int): metric = coders.VarIntCoder().encode(metric) return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels) -def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None): +def int64_user_distribution(namespace, name, metric, ptransform=None): """Return the distribution monitoring info for the URN, metric and labels. Args: urn: The URN of the monitoring info/metric. metric: The DistributionData for the metric. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. """ - labels = create_labels( - ptransform=ptransform, tag=tag, namespace=namespace, name=name) + labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) payload = _encode_distribution( coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) return create_monitoring_info( USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) -def int64_distribution(urn, metric, ptransform=None, tag=None): +def int64_distribution(urn, metric, ptransform=None, pcollection=None): # type: (...) -> metrics_pb2.MonitoringInfo """Return a distribution monitoring info for the URN, metric and labels. @@ -193,16 +188,16 @@ def int64_distribution(urn, metric, ptransform=None, tag=None): Args: urn: The URN of the monitoring info/metric. metric: The DistributionData for the metric. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. + pcollection: The pcollection id used as a label. """ - labels = create_labels(ptransform=ptransform, tag=tag) + labels = create_labels(ptransform=ptransform, pcollection=pcollection) payload = _encode_distribution( coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels) -def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None): +def int64_user_gauge(namespace, name, metric, ptransform=None): # type: (...) -> metrics_pb2.MonitoringInfo """Return the gauge monitoring info for the URN, metric and labels. @@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None): namespace: User-defined namespace of counter. name: Name of counter. metric: The GaugeData containing the metrics. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. """ - labels = create_labels( - ptransform=ptransform, tag=tag, namespace=namespace, name=name) + labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) if isinstance(metric, GaugeData): coder = coders.VarIntCoder() value = metric.value @@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None): USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels) -def int64_gauge(urn, metric, ptransform=None, tag=None): +def int64_gauge(urn, metric, ptransform=None): # type: (...) -> metrics_pb2.MonitoringInfo """Return the gauge monitoring info for the URN, metric and labels. @@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None): urn: The URN of the monitoring info/metric. metric: An int representing the value. The current time will be used for the timestamp. - ptransform: The ptransform/step name used as a label. - tag: The output tag name, used as a label. + ptransform: The ptransform id used as a label. """ - labels = create_labels(ptransform=ptransform, tag=tag) + labels = create_labels(ptransform=ptransform) if isinstance(metric, int): value = metric time_ms = int(time.time()) * 1000 diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 2cd69c3fbd61a..82359c27e7749 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1076,59 +1076,12 @@ def monitoring_infos(self): # Construct a new dict first to remove duplicates. all_monitoring_infos_dict = {} for transform_id, op in self.ops.items(): - for mi in op.monitoring_infos(transform_id).values(): - fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi) - all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi - - infos_list = list(all_monitoring_infos_dict.values()) - - def inject_pcollection(monitoring_info): - """ - If provided metric is element count metric: - Finds relevant transform output info in current process_bundle_descriptor - and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring - info. - """ - if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS: - if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels: - return - ptransform_label = monitoring_info.labels[ - monitoring_infos.PTRANSFORM_LABEL] - if not monitoring_infos.TAG_LABEL in monitoring_info.labels: - return - tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL] - - if not ptransform_label in self.process_bundle_descriptor.transforms: - return - if not tag_label in self.process_bundle_descriptor.transforms[ - ptransform_label].outputs: - return - - pcollection_name = ( - self.process_bundle_descriptor.transforms[ptransform_label]. - outputs[tag_label]) - - monitoring_info.labels[ - monitoring_infos.PCOLLECTION_LABEL] = pcollection_name - - # Cleaning up labels that are not in specification. - monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL) - monitoring_info.labels.pop(monitoring_infos.TAG_LABEL) - - for mi in infos_list: - inject_pcollection(mi) - - return infos_list + tag_to_pcollection_id = self.process_bundle_descriptor.transforms[ + transform_id].outputs + all_monitoring_infos_dict.update( + op.monitoring_infos(transform_id, tag_to_pcollection_id)) - def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info): - # type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo - actual_output_tags = list( - self.process_bundle_descriptor.transforms[transform_id].outputs.keys()) - if ('TAG' in monitoring_info.labels and - monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'): - if len(actual_output_tags) == 1: - monitoring_info.labels['TAG'] = actual_output_tags[0] - return monitoring_info + return list(all_monitoring_infos_dict.values()) def shutdown(self): # type: () -> None diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index 418cdcc2d6da1..36fa809ab8de6 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -73,8 +73,8 @@ cdef class Operation(object): cpdef output(self, WindowedValue windowed_value, int output_index=*) cpdef execution_time_monitoring_infos(self, transform_id) cpdef user_monitoring_infos(self, transform_id) - cpdef pcollection_count_monitoring_infos(self, transform_id) - cpdef monitoring_infos(self, transform_id) + cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id) + cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id) cdef class ReadOperation(Operation): diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 66f12f292bbfd..2a271369936df 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -337,43 +337,49 @@ def add_receiver(self, operation, output_index=0): """Adds a receiver operation for the specified output.""" self.consumers[output_index].append(operation) - def monitoring_infos(self, transform_id): - # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] + def monitoring_infos(self, transform_id, tag_to_pcollection_id): + # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] """Returns the list of MonitoringInfos collected by this operation.""" all_monitoring_infos = self.execution_time_monitoring_infos(transform_id) all_monitoring_infos.update( - self.pcollection_count_monitoring_infos(transform_id)) + self.pcollection_count_monitoring_infos(tag_to_pcollection_id)) all_monitoring_infos.update(self.user_monitoring_infos(transform_id)) return all_monitoring_infos - def pcollection_count_monitoring_infos(self, transform_id): + def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): + # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] + """Returns the element count MonitoringInfo collected by this operation.""" - if len(self.receivers) == 1: - # If there is exactly one output, we can unambiguously - # fix its name later, which we do. - # TODO(robertwb): Plumb the actual name here. - elem_count_mi = monitoring_infos.int64_counter( - monitoring_infos.ELEMENT_COUNT_URN, - self.receivers[0].opcounter.element_counter.value(), - ptransform=transform_id, - tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None), - ) - - (unused_mean, sum, count, min, max) = ( - self.receivers[0].opcounter.mean_byte_counter.value()) - - sampled_byte_count = monitoring_infos.int64_distribution( - monitoring_infos.SAMPLED_BYTE_SIZE_URN, - DistributionData(sum, count, min, max), - ptransform=transform_id, - tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None), - ) - return { - monitoring_infos.to_key(elem_count_mi): elem_count_mi, - monitoring_infos.to_key(sampled_byte_count): sampled_byte_count - } - return {} + + # Skip producing monitoring infos if there is more then one receiver + # since there is no way to provide a mapping from tag to pcollection id + # within Operation. + if len(self.receivers) != 1 or len(tag_to_pcollection_id) != 1: + return {} + + all_monitoring_infos = {} + pcollection_id = next(iter(tag_to_pcollection_id.values())) + receiver = self.receivers[0] + elem_count_mi = monitoring_infos.int64_counter( + monitoring_infos.ELEMENT_COUNT_URN, + receiver.opcounter.element_counter.value(), + pcollection=pcollection_id, + ) + + (unused_mean, sum, count, min, max) = ( + receiver.opcounter.mean_byte_counter.value()) + + sampled_byte_count = monitoring_infos.int64_distribution( + monitoring_infos.SAMPLED_BYTE_SIZE_URN, + DistributionData(sum, count, min, max), + pcollection=pcollection_id, + ) + all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi + all_monitoring_infos[monitoring_infos.to_key( + sampled_byte_count)] = sampled_byte_count + + return all_monitoring_infos def user_monitoring_infos(self, transform_id): """Returns the user MonitoringInfos collected by this operation.""" @@ -709,25 +715,31 @@ def reset(self): self.user_state_context.reset() self.dofn_runner.bundle_finalizer_param.reset() - def monitoring_infos(self, transform_id): - # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] - infos = super(DoOperation, self).monitoring_infos(transform_id) + def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): + # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] + + """Returns the element count MonitoringInfo collected by this operation.""" + infos = super( + DoOperation, + self).pcollection_count_monitoring_infos(tag_to_pcollection_id) + if self.tagged_receivers: for tag, receiver in self.tagged_receivers.items(): - mi = monitoring_infos.int64_counter( - monitoring_infos.ELEMENT_COUNT_URN, - receiver.opcounter.element_counter.value(), - ptransform=transform_id, - tag=str(tag)) - infos[monitoring_infos.to_key(mi)] = mi - (unused_mean, sum, count, min, max) = ( - receiver.opcounter.mean_byte_counter.value()) - sampled_byte_count = monitoring_infos.int64_distribution( - monitoring_infos.SAMPLED_BYTE_SIZE_URN, - DistributionData(sum, count, min, max), - ptransform=transform_id, - tag=str(tag)) - infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count + pcollection_id = tag_to_pcollection_id[str(tag)] + if pcollection_id: + mi = monitoring_infos.int64_counter( + monitoring_infos.ELEMENT_COUNT_URN, + receiver.opcounter.element_counter.value(), + pcollection=pcollection_id) + infos[monitoring_infos.to_key(mi)] = mi + (unused_mean, sum, count, min, max) = ( + receiver.opcounter.mean_byte_counter.value()) + sampled_byte_count = monitoring_infos.int64_distribution( + monitoring_infos.SAMPLED_BYTE_SIZE_URN, + DistributionData(sum, count, min, max), + pcollection=pcollection_id) + infos[monitoring_infos.to_key( + sampled_byte_count)] = sampled_byte_count return infos @@ -778,8 +790,8 @@ def current_element_progress(self): self.element_start_output_bytes) return None - def monitoring_infos(self, transform_id): - # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] + def monitoring_infos(self, transform_id, tag_to_pcollection_id): + # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] def encode_progress(value): # type: (float) -> bytes @@ -788,7 +800,7 @@ def encode_progress(value): with self.lock: infos = super(SdfProcessSizedElements, - self).monitoring_infos(transform_id) + self).monitoring_infos(transform_id, tag_to_pcollection_id) current_element_progress = self.current_element_progress() if current_element_progress: if current_element_progress.completed_work: