Skip to content

Commit

Permalink
[BEAM-9488] Ensure we pass through PCollection ids instead of attempt…
Browse files Browse the repository at this point in the history
…ing to fix them up. (apache#11514)

* [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.

* fixup! Convert to list since values() isn't subscriptable

* fixup! Use zip

* fixup! Migrate to use tag -> pcollection id

* fixup! lint

* fixup! Fix comparison
  • Loading branch information
lukecwik authored and mxm committed Jul 8, 2020
1 parent 86e9f62 commit 3e62a5b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 137 deletions.
60 changes: 26 additions & 34 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
NAMESPACE_LABEL = (
common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
TAG_LABEL = "TAG"


def extract_counter_value(monitoring_info_proto):
Expand Down Expand Up @@ -113,96 +112,92 @@ def extract_distribution(monitoring_info_proto):
coders.VarIntCoder(), monitoring_info_proto.payload)


def create_labels(ptransform=None, tag=None, namespace=None, name=None):
"""Create the label dictionary based on the provided tags.
def create_labels(ptransform=None, namespace=None, name=None, pcollection=None):
"""Create the label dictionary based on the provided values.
Args:
ptransform: The ptransform/step name.
tag: he output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = {}
if tag:
labels[TAG_LABEL] = tag
if ptransform:
labels[PTRANSFORM_LABEL] = ptransform
if namespace:
labels[NAMESPACE_LABEL] = namespace
if name:
labels[NAME_LABEL] = name
if pcollection:
labels[PCOLLECTION_LABEL] = pcollection
return labels


def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
def int64_user_counter(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.
Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(
USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)


def int64_counter(urn, metric, ptransform=None, tag=None):
def int64_counter(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.
Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)


def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
def int64_user_distribution(namespace, name, metric, ptransform=None):
"""Return the distribution monitoring info for the URN, metric and labels.
Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_distribution(urn, metric, ptransform=None, tag=None):
def int64_distribution(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return a distribution monitoring info for the URN, metric and labels.
Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
def int64_user_gauge(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
namespace: User-defined namespace of counter.
name: Name of counter.
metric: The GaugeData containing the metrics.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, GaugeData):
coder = coders.VarIntCoder()
value = metric.value
Expand All @@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels)


def int64_gauge(urn, metric, ptransform=None, tag=None):
def int64_gauge(urn, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None):
urn: The URN of the monitoring info/metric.
metric: An int representing the value. The current time will be used for
the timestamp.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform)
if isinstance(metric, int):
value = metric
time_ms = int(time.time()) * 1000
Expand Down
57 changes: 5 additions & 52 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,59 +1076,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
for mi in op.monitoring_infos(transform_id).values():
fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi

infos_list = list(all_monitoring_infos_dict.values())

def inject_pcollection(monitoring_info):
"""
If provided metric is element count metric:
Finds relevant transform output info in current process_bundle_descriptor
and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
info.
"""
if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
return
ptransform_label = monitoring_info.labels[
monitoring_infos.PTRANSFORM_LABEL]
if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
return
tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]

if not ptransform_label in self.process_bundle_descriptor.transforms:
return
if not tag_label in self.process_bundle_descriptor.transforms[
ptransform_label].outputs:
return

pcollection_name = (
self.process_bundle_descriptor.transforms[ptransform_label].
outputs[tag_label])

monitoring_info.labels[
monitoring_infos.PCOLLECTION_LABEL] = pcollection_name

# Cleaning up labels that are not in specification.
monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)

for mi in infos_list:
inject_pcollection(mi)

return infos_list
tag_to_pcollection_id = self.process_bundle_descriptor.transforms[
transform_id].outputs
all_monitoring_infos_dict.update(
op.monitoring_infos(transform_id, tag_to_pcollection_id))

def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
# type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo
actual_output_tags = list(
self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
if ('TAG' in monitoring_info.labels and
monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
if len(actual_output_tags) == 1:
monitoring_info.labels['TAG'] = actual_output_tags[0]
return monitoring_info
return list(all_monitoring_infos_dict.values())

def shutdown(self):
# type: () -> None
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, transform_id)
cpdef monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id)
cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id)


cdef class ReadOperation(Operation):
Expand Down
110 changes: 61 additions & 49 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,43 +337,49 @@ def add_receiver(self, operation, output_index=0):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
self.pcollection_count_monitoring_infos(transform_id))
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos

def pcollection_count_monitoring_infos(self, transform_id):
def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
if len(self.receivers) == 1:
# If there is exactly one output, we can unambiguously
# fix its name later, which we do.
# TODO(robertwb): Plumb the actual name here.
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
self.receivers[0].opcounter.element_counter.value(),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
)

(unused_mean, sum, count, min, max) = (
self.receivers[0].opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
)
return {
monitoring_infos.to_key(elem_count_mi): elem_count_mi,
monitoring_infos.to_key(sampled_byte_count): sampled_byte_count
}
return {}

# Skip producing monitoring infos if there is more then one receiver
# since there is no way to provide a mapping from tag to pcollection id
# within Operation.
if len(self.receivers) != 1 or len(tag_to_pcollection_id) != 1:
return {}

all_monitoring_infos = {}
pcollection_id = next(iter(tag_to_pcollection_id.values()))
receiver = self.receivers[0]
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

return all_monitoring_infos

def user_monitoring_infos(self, transform_id):
"""Returns the user MonitoringInfos collected by this operation."""
Expand Down Expand Up @@ -709,25 +715,31 @@ def reset(self):
self.user_state_context.reset()
self.dofn_runner.bundle_finalizer_param.reset()

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
infos = super(DoOperation, self).monitoring_infos(transform_id)
def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
infos = super(
DoOperation,
self).pcollection_count_monitoring_infos(tag_to_pcollection_id)

if self.tagged_receivers:
for tag, receiver in self.tagged_receivers.items():
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
pcollection_id = tag_to_pcollection_id[str(tag)]
if pcollection_id:
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
return infos


Expand Down Expand Up @@ -778,8 +790,8 @@ def current_element_progress(self):
self.element_start_output_bytes)
return None

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

def encode_progress(value):
# type: (float) -> bytes
Expand All @@ -788,7 +800,7 @@ def encode_progress(value):

with self.lock:
infos = super(SdfProcessSizedElements,
self).monitoring_infos(transform_id)
self).monitoring_infos(transform_id, tag_to_pcollection_id)
current_element_progress = self.current_element_progress()
if current_element_progress:
if current_element_progress.completed_work:
Expand Down

0 comments on commit 3e62a5b

Please sign in to comment.