Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. #11514

Merged
merged 6 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[BEAM-9488] Ensure we pass through PCollection ids instead of attempt…
…ing to fix them up.
  • Loading branch information
lukecwik committed Apr 23, 2020
commit 96cc611da4e17213e6458b11112e9c7343ba2509
60 changes: 26 additions & 34 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
NAMESPACE_LABEL = (
common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
TAG_LABEL = "TAG"


def extract_counter_value(monitoring_info_proto):
Expand Down Expand Up @@ -113,96 +112,92 @@ def extract_distribution(monitoring_info_proto):
coders.VarIntCoder(), monitoring_info_proto.payload)


def create_labels(ptransform=None, tag=None, namespace=None, name=None):
"""Create the label dictionary based on the provided tags.
def create_labels(ptransform=None, namespace=None, name=None, pcollection=None):
"""Create the label dictionary based on the provided values.

Args:
ptransform: The ptransform/step name.
tag: he output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = {}
if tag:
labels[TAG_LABEL] = tag
if ptransform:
labels[PTRANSFORM_LABEL] = ptransform
if namespace:
labels[NAMESPACE_LABEL] = namespace
if name:
labels[NAME_LABEL] = name
if pcollection:
labels[PCOLLECTION_LABEL] = pcollection
return labels


def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
def int64_user_counter(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(
USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)


def int64_counter(urn, metric, ptransform=None, tag=None):
def int64_counter(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)


def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
def int64_user_distribution(namespace, name, metric, ptransform=None):
"""Return the distribution monitoring info for the URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_distribution(urn, metric, ptransform=None, tag=None):
def int64_distribution(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return a distribution monitoring info for the URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
def int64_user_gauge(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
namespace: User-defined namespace of counter.
name: Name of counter.
metric: The GaugeData containing the metrics.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, GaugeData):
coder = coders.VarIntCoder()
value = metric.value
Expand All @@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels)


def int64_gauge(urn, metric, ptransform=None, tag=None):
def int64_gauge(urn, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None):
urn: The URN of the monitoring info/metric.
metric: An int representing the value. The current time will be used for
the timestamp.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform)
if isinstance(metric, int):
value = metric
time_ms = int(time.time()) * 1000
Expand Down
57 changes: 5 additions & 52 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,59 +1035,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
for mi in op.monitoring_infos(transform_id).values():
fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi

infos_list = list(all_monitoring_infos_dict.values())

def inject_pcollection(monitoring_info):
"""
If provided metric is element count metric:
Finds relevant transform output info in current process_bundle_descriptor
and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
info.
"""
if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
return
ptransform_label = monitoring_info.labels[
monitoring_infos.PTRANSFORM_LABEL]
if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
return
tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]

if not ptransform_label in self.process_bundle_descriptor.transforms:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a bug, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

if not tag_label in self.process_bundle_descriptor.transforms[
ptransform_label].outputs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this can happen, and might be what's happening here. There is no PCollection for this tag, but the user outputted a value to this tag. It would make sense to record this output even if we didn't use it. This is another downside of attaching these counters to PCollections themselves rather than to PTransform outputs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unknown outputs should probably be reported another way

return

pcollection_name = (
self.process_bundle_descriptor.transforms[ptransform_label].
outputs[tag_label])

monitoring_info.labels[
monitoring_infos.PCOLLECTION_LABEL] = pcollection_name

# Cleaning up labels that are not in specification.
monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)

for mi in infos_list:
inject_pcollection(mi)

return infos_list
pcollection_ids = self.process_bundle_descriptor.transforms[
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the id order aligns with the receiver order since transform_consumers built above iterates the outputs map in the same order and this gets plumbed down through to Operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice this might be OK (dicts have undefined, but I think when modified deterministic, iteration order), but seems rather brittle to me. Could we instead passed the tag -> pcollection_id mapping here?

transform_id].outputs.values()
all_monitoring_infos_dict.update(
op.monitoring_infos(transform_id, pcollection_ids))

def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
# type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo
actual_output_tags = list(
self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
if ('TAG' in monitoring_info.labels and
monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
if len(actual_output_tags) == 1:
monitoring_info.labels['TAG'] = actual_output_tags[0]
return monitoring_info
return list(all_monitoring_infos_dict.values())

def shutdown(self):
# type: () -> None
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, transform_id)
cpdef monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, pcollection_ids)
cpdef monitoring_infos(self, transform_id, pcollection_ids)


cdef class ReadOperation(Operation):
Expand Down
68 changes: 26 additions & 42 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,43 +337,48 @@ def add_receiver(self, operation, output_index=0):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, pcollection_ids):
# type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
self.pcollection_count_monitoring_infos(transform_id))
self.pcollection_count_monitoring_infos(pcollection_ids))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos

def pcollection_count_monitoring_infos(self, transform_id):
def pcollection_count_monitoring_infos(self, pcollection_ids):
"""Returns the element count MonitoringInfo collected by this operation."""
if len(self.receivers) == 1:
# If there is exactly one output, we can unambiguously
# fix its name later, which we do.
# TODO(robertwb): Plumb the actual name here.
if len(self.receivers) != len(pcollection_ids):
raise RuntimeError(
'Unexpected number of receivers for number of pcollections %s %s' %
(self.receivers, pcollection_ids))

all_monitoring_infos = {}
for i in range(len(self.receivers)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change if you use a mapping, but zip would be the idiom to use here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 using zip

receiver = self.receivers[i]
pcollection_id = pcollection_ids[i]
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
self.receivers[0].opcounter.element_counter.value(),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
self.receivers[0].opcounter.mean_byte_counter.value())
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
pcollection=pcollection_id,
)
return {
monitoring_infos.to_key(elem_count_mi): elem_count_mi,
monitoring_infos.to_key(sampled_byte_count): sampled_byte_count
}
return {}

all_monitoring_infos[monitoring_infos.to_key(
elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

return all_monitoring_infos

def user_monitoring_infos(self, transform_id):
"""Returns the user MonitoringInfos collected by this operation."""
Expand Down Expand Up @@ -708,27 +713,6 @@ def reset(self):
self.user_state_context.reset()
self.dofn_runner.bundle_finalizer_param.reset()

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
infos = super(DoOperation, self).monitoring_infos(transform_id)
if self.tagged_receivers:
for tag, receiver in self.tagged_receivers.items():
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
return infos


class SdfProcessSizedElements(DoOperation):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -777,7 +761,7 @@ def current_element_progress(self):
self.element_start_output_bytes)
return None

def monitoring_infos(self, transform_id):
def monitoring_infos(self, transform_id, pcollection_ids):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

def encode_progress(value):
Expand All @@ -787,7 +771,7 @@ def encode_progress(value):

with self.lock:
infos = super(SdfProcessSizedElements,
self).monitoring_infos(transform_id)
self).monitoring_infos(transform_id, pcollection_ids)
current_element_progress = self.current_element_progress()
if current_element_progress:
if current_element_progress.completed_work:
Expand Down