Skip to content

Commit

Permalink
[BEAM-1833] Fixes BEAM-1833
Browse files Browse the repository at this point in the history
  • Loading branch information
rohdesamuel committed Feb 18, 2020
1 parent aad3ec5 commit 2adf96b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

* ReadFromPubSub(topic=<topic>) in Python previously created a subscription under the same project as the topic. Now it will create the subscription under the project specified in pipeline_options. If the project is not specified in pipeline_options, then it will create the subscription under the same project as the topic. ([BEAM-3453](https://issues.apache.org/jira/browse/BEAM-3453)).
* SpannerAccessor in Java is now package-private to reduce API surface. `SpannerConfig.connectToSpanner` has been moved to `SpannerAccessor.create`. ([BEAM-9310](https://issues.apache.org/jira/browse/BEAM-9310)).
* PCollections will now have their tags correctly propagated through the Pipeline. Users may expect the old implementation which gave PCollection output ids a monotonically increasing id. To go back to the old implementation, use the "force_generated_pcollection_output_ids" experiment. The default is the new implementation (force_generated_pcollection_output_ids=False).

### Deprecations

Expand All @@ -48,10 +49,12 @@

* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Fixed exception when running in IPython notebook (Python) ([BEAM-X9277](https://issues.apache.org/jira/browse/BEAM-9277)).
* Fixed 1833 (Python) ([BEAM-1833](https://issues.apache.org/jira/browse/BEAM-1833))

### Known Issues

* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* ([BEAM-9322](https://issues.apache.org/jira/browse/BEAM-9322)).

# [2.19.0] - 2020-01-31

Expand Down
52 changes: 40 additions & 12 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def _replace_if_needed(self, original_transform_node):
elif len(inputs) == 1:
input_node = inputs[0]
elif len(inputs) == 0:
input_node = pvalue.PBegin(self)
input_node = pvalue.PBegin(self.pipeline)

# We have to add the new AppliedTransform to the stack before expand()
# and pop it out later to make sure that parts get added correctly.
Expand All @@ -309,12 +309,22 @@ def _replace_if_needed(self, original_transform_node):
self.pipeline._remove_labels_recursively(original_transform_node)

new_output = replacement_transform.expand(input_node)
assert isinstance(
new_output, (dict, pvalue.PValue, pvalue.DoOutputsTuple))

if isinstance(new_output, pvalue.PValue):
new_output.element_type = None
self.pipeline._infer_result_type(
replacement_transform, inputs, new_output)
replacement_transform_node.add_output(new_output)

if isinstance(new_output, dict):
for new_tag, new_pcoll in new_output.items():
replacement_transform_node.add_output(new_pcoll, new_tag)
elif isinstance(new_output, pvalue.DoOutputsTuple):
replacement_transform_node.add_output(
new_output, new_output._main_tag)
else:
replacement_transform_node.add_output(new_output, new_output.tag)

# Recording updated outputs. This cannot be done in the same visitor
# since if we dynamically update output type here, we'll run into
Expand All @@ -325,7 +335,8 @@ def _replace_if_needed(self, original_transform_node):
if isinstance(new_output, pvalue.PValue):
if not new_output.producer:
new_output.producer = replacement_transform_node
output_map[original_transform_node.outputs[None]] = new_output
output_map[original_transform_node.outputs[new_output.tag]] = \
new_output
elif isinstance(new_output, (pvalue.DoOutputsTuple, tuple)):
for pcoll in new_output:
if not pcoll.producer:
Expand Down Expand Up @@ -603,7 +614,30 @@ def apply(self, transform, pvalueish=None, label=None):
self._infer_result_type(transform, inputs, result)

assert isinstance(result.producer.inputs, tuple)
current.add_output(result)
# The DoOutputsTuple adds the PCollection to the outputs when accessed
# except for the main tag. Add the main tag here.
if isinstance(result, pvalue.DoOutputsTuple):
current.add_output(result, result._main_tag)
continue

# TODO(BEAM-9322): Find the best auto-generated tags for nested
# PCollections.
# If the user wants the old implementation of always generated
# PCollection output ids, then set the tag to None first, then count up
# from 1.
if self._options.view_as(DebugOptions).lookup_experiment(
'force_generated_pcollection_output_ids', default=False):
tag = len(current.outputs) if None in current.outputs else None
current.add_output(result, tag)
continue

# Otherwise default to the new implementation which only auto-generates
# tags for multiple PCollections with an unset tag.
if result.tag is None and None in current.outputs:
tag = len(current.outputs)
else:
tag = result.tag
current.add_output(result, tag)

if (type_options is not None and
type_options.type_check_strictness == 'ALL_REQUIRED' and
Expand Down Expand Up @@ -887,20 +921,14 @@ def replace_output(self,

def add_output(self,
output, # type: Union[pvalue.DoOutputsTuple, pvalue.PValue]
tag=None # type: Union[str, int, None]
tag # type: Union[str, int, None]
):
# type: (...) -> None
if isinstance(output, pvalue.DoOutputsTuple):
self.add_output(output[output._main_tag])
self.add_output(output[tag], tag)
elif isinstance(output, pvalue.PValue):
# TODO(BEAM-1833): Require tags when calling this method.
if tag is None and None in self.outputs:
tag = len(self.outputs)
assert tag not in self.outputs
self.outputs[tag] = output
elif isinstance(output, dict):
for output_tag, out in output.items():
self.add_output(out, tag=output_tag)
else:
raise TypeError("Unexpected output type: %s" % output)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def visit_transform(self, transform_node):
transform_node.full_label + '/MapToVoidKey%s' % ix,
(side_input.pvalue, ))
new_side_input.pvalue.producer = map_to_void_key
map_to_void_key.add_output(new_side_input.pvalue)
map_to_void_key.add_output(new_side_input.pvalue, None)
parent.add_part(map_to_void_key)
elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
# Ensure the input coder is a KV coder and patch up the
Expand Down

0 comments on commit 2adf96b

Please sign in to comment.