Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. #11514

Merged
merged 6 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup! Migrate to use tag -> pcollection id
  • Loading branch information
lukecwik committed Apr 24, 2020
commit 051c094e307d049210d7db025758f3c7efbc9ae7
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,10 +1035,10 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
pcollection_ids = self.process_bundle_descriptor.transforms[
transform_id].outputs.values()
tag_to_pcollection_id = self.process_bundle_descriptor.transforms[
transform_id].outputs
all_monitoring_infos_dict.update(
op.monitoring_infos(transform_id, pcollection_ids))
op.monitoring_infos(transform_id, tag_to_pcollection_id))

return list(all_monitoring_infos_dict.values())

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
cpdef pcollection_count_monitoring_infos(self, pcollection_ids)
cpdef monitoring_infos(self, transform_id, pcollection_ids)
cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id)
cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id)


cdef class ReadOperation(Operation):
Expand Down
92 changes: 61 additions & 31 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,44 +337,47 @@ def add_receiver(self, operation, output_index=0):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)

def monitoring_infos(self, transform_id, pcollection_ids):
# type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
self.pcollection_count_monitoring_infos(pcollection_ids))
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos

def pcollection_count_monitoring_infos(self, pcollection_ids):
def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
if len(self.receivers) != len(pcollection_ids):
raise RuntimeError(
'Unexpected number of receivers for number of pcollections %s %s' %
(self.receivers, pcollection_ids))

# Skip producing monitoring infos if there is more then one receiver
# since there is no way to provide a mapping from tag to pcollection id
# within Operation.
if len(self.receivers) != len(tag_to_pcollection_id) != 1:
return {}

all_monitoring_infos = {}
for pcollection_id, receiver in zip(pcollection_ids, self.receivers):
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)

all_monitoring_infos[monitoring_infos.to_key(
elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
pcollection_id = next(iter(tag_to_pcollection_id.values()))
receiver = receivers[0]
elem_count_mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

return all_monitoring_infos

Expand Down Expand Up @@ -711,6 +714,33 @@ def reset(self):
self.user_state_context.reset()
self.dofn_runner.bundle_finalizer_param.reset()

def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
# type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

"""Returns the element count MonitoringInfo collected by this operation."""
infos = super(
DoOperation,
self).pcollection_count_monitoring_infos(tag_to_pcollection_id)

if self.tagged_receivers:
for tag, receiver in self.tagged_receivers.items():
pcollection_id = tag_to_pcollection_id[str(tag)]
if pcollection_id:
mi = monitoring_infos.int64_counter(
monitoring_infos.ELEMENT_COUNT_URN,
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
return infos


class SdfProcessSizedElements(DoOperation):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -759,8 +789,8 @@ def current_element_progress(self):
self.element_start_output_bytes)
return None

def monitoring_infos(self, transform_id, pcollection_ids):
# type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]

def encode_progress(value):
# type: (float) -> bytes
Expand All @@ -769,7 +799,7 @@ def encode_progress(value):

with self.lock:
infos = super(SdfProcessSizedElements,
self).monitoring_infos(transform_id, pcollection_ids)
self).monitoring_infos(transform_id, tag_to_pcollection_id)
current_element_progress = self.current_element_progress()
if current_element_progress:
if current_element_progress.completed_work:
Expand Down