Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9488] [release-2.21.0] Ensure we pass through PCollection ids instead of attempt… #11687

Merged
merged 1 commit into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 26 additions & 34 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
NAMESPACE_LABEL = (
common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
TAG_LABEL = "TAG"


def extract_counter_value(monitoring_info_proto):
Expand Down Expand Up @@ -113,96 +112,92 @@ def extract_distribution(monitoring_info_proto):
coders.VarIntCoder(), monitoring_info_proto.payload)


def create_labels(ptransform=None, tag=None, namespace=None, name=None):
"""Create the label dictionary based on the provided tags.
def create_labels(ptransform=None, namespace=None, name=None, pcollection=None):
"""Create the label dictionary based on the provided values.

Args:
ptransform: The ptransform/step name.
tag: he output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = {}
if tag:
labels[TAG_LABEL] = tag
if ptransform:
labels[PTRANSFORM_LABEL] = ptransform
if namespace:
labels[NAMESPACE_LABEL] = namespace
if name:
labels[NAME_LABEL] = name
if pcollection:
labels[PCOLLECTION_LABEL] = pcollection
return labels


def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
def int64_user_counter(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(
USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)


def int64_counter(urn, metric, ptransform=None, tag=None):
def int64_counter(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the counter monitoring info for the specifed URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)


def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
def int64_user_distribution(namespace, name, metric, ptransform=None):
"""Return the distribution monitoring info for the URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_distribution(urn, metric, ptransform=None, tag=None):
def int64_distribution(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return a distribution monitoring info for the URN, metric and labels.

Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
def int64_user_gauge(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
namespace: User-defined namespace of counter.
name: Name of counter.
metric: The GaugeData containing the metrics.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(
ptransform=ptransform, tag=tag, namespace=namespace, name=name)
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, GaugeData):
coder = coders.VarIntCoder()
value = metric.value
Expand All @@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels)


def int64_gauge(urn, metric, ptransform=None, tag=None):
def int64_gauge(urn, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo

"""Return the gauge monitoring info for the URN, metric and labels.
Expand All @@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None):
urn: The URN of the monitoring info/metric.
metric: An int representing the value. The current time will be used for
the timestamp.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, tag=tag)
labels = create_labels(ptransform=ptransform)
if isinstance(metric, int):
value = metric
time_ms = int(time.time()) * 1000
Expand Down
57 changes: 5 additions & 52 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,59 +1076,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
for mi in op.monitoring_infos(transform_id).values():
fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi

infos_list = list(all_monitoring_infos_dict.values())

def inject_pcollection(monitoring_info):
"""
If provided metric is element count metric:
Finds relevant transform output info in current process_bundle_descriptor
and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
info.
"""
if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
return
ptransform_label = monitoring_info.labels[
monitoring_infos.PTRANSFORM_LABEL]
if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
return
tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]

if not ptransform_label in self.process_bundle_descriptor.transforms:
return
if not tag_label in self.process_bundle_descriptor.transforms[
ptransform_label].outputs:
return

pcollection_name = (
self.process_bundle_descriptor.transforms[ptransform_label].
outputs[tag_label])

monitoring_info.labels[
monitoring_infos.PCOLLECTION_LABEL] = pcollection_name

# Cleaning up labels that are not in specification.
monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)

for mi in infos_list:
inject_pcollection(mi)

return infos_list
tag_to_pcollection_id = self.process_bundle_descriptor.transforms[
transform_id].outputs
all_monitoring_infos_dict.update(
op.monitoring_infos(transform_id, tag_to_pcollection_id))

def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
# type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo
actual_output_tags = list(
self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
if ('TAG' in monitoring_info.labels and
monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
if len(actual_output_tags) == 1:
monitoring_info.labels['TAG'] = actual_output_tags[0]
return monitoring_info
return list(all_monitoring_infos_dict.values())

def shutdown(self):
# type: () -> None
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, transform_id)
cpdef monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id)
cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id)


cdef class ReadOperation(Operation):
Expand Down
110 changes: 61 additions & 49 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,43 +337,49 @@ def add_receiver(self, operation, output_index=0):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
self.pcollection_count_monitoring_infos(transform_id))
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos

def pcollection_count_monitoring_infos(self, transform_id):
def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
if len(self.receivers) == 1:
# If there is exactly one output, we can unambiguously
# fix its name later, which we do.
# TODO(robertwb): Plumb the actual name here.
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
self.receivers[0].opcounter.element_counter.value(),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
)

(unused_mean, sum, count, min, max) = (
self.receivers[0].opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
)
return {
monitoring_infos.to_key(elem_count_mi): elem_count_mi,
monitoring_infos.to_key(sampled_byte_count): sampled_byte_count
}
return {}

# Skip producing monitoring infos if there is more then one receiver
# since there is no way to provide a mapping from tag to pcollection id
# within Operation.
if len(self.receivers) != 1 or len(tag_to_pcollection_id) != 1:
return {}

all_monitoring_infos = {}
pcollection_id = next(iter(tag_to_pcollection_id.values()))
receiver = self.receivers[0]
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

return all_monitoring_infos

def user_monitoring_infos(self, transform_id):
"""Returns the user MonitoringInfos collected by this operation."""
Expand Down Expand Up @@ -709,25 +715,31 @@ def reset(self):
self.user_state_context.reset()
self.dofn_runner.bundle_finalizer_param.reset()

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
infos = super(DoOperation, self).monitoring_infos(transform_id)
def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
infos = super(
DoOperation,
self).pcollection_count_monitoring_infos(tag_to_pcollection_id)

if self.tagged_receivers:
for tag, receiver in self.tagged_receivers.items():
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
ptransform=transform_id,
tag=str(tag))
infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
pcollection_id = tag_to_pcollection_id[str(tag)]
if pcollection_id:
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
return infos


Expand Down Expand Up @@ -778,8 +790,8 @@ def current_element_progress(self):
self.element_start_output_bytes)
return None

def monitoring_infos(self, transform_id):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

def encode_progress(value):
# type: (float) -> bytes
Expand All @@ -788,7 +800,7 @@ def encode_progress(value):

with self.lock:
infos = super(SdfProcessSizedElements,
self).monitoring_infos(transform_id)
self).monitoring_infos(transform_id, tag_to_pcollection_id)
current_element_progress = self.current_element_progress()
if current_element_progress:
if current_element_progress.completed_work:
Expand Down