Skip to content

Commit

Permalink
[BEAM-10118] Unconditionally use safe coders for data channels in FnA…
Browse files Browse the repository at this point in the history
…PI runner.
  • Loading branch information
robertwb committed Jun 4, 2020
1 parent edbe4ce commit ae36a5d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,8 @@ def add_data_channel_coder(self, pcoll_id):
self.components.windowing_strategies[
pcoll.windowing_strategy_id].window_coder_id
])
channel_coder = self.add_or_get_coder_id(
proto, pcoll.coder_id + '_windowed')
if pcoll.coder_id in self.safe_coders:
channel_coder = self.length_prefixed_coder(channel_coder)
self.data_channel_coders[pcoll_id] = channel_coder
self.data_channel_coders[pcoll_id] = self.maybe_length_prefixed_coder(
self.add_or_get_coder_id(proto, pcoll.coder_id + '_windowed'))

@memoize_on_instance
def with_state_iterables(self, coder_id):
Expand Down Expand Up @@ -404,23 +401,24 @@ def with_state_iterables(self, coder_id):
return new_coder_id

@memoize_on_instance
def length_prefixed_coder(self, coder_id):
def maybe_length_prefixed_coder(self, coder_id):
# type: (str) -> str
if coder_id in self.safe_coders:
return coder_id
length_prefixed_id, safe_id = self.length_prefixed_and_safe_coder(coder_id)
self.safe_coders[length_prefixed_id] = safe_id
return length_prefixed_id
(maybe_length_prefixed_id,
safe_id) = self.maybe_length_prefixed_and_safe_coder(coder_id)
self.safe_coders[maybe_length_prefixed_id] = safe_id
return maybe_length_prefixed_id

@memoize_on_instance
def length_prefixed_and_safe_coder(self, coder_id):
def maybe_length_prefixed_and_safe_coder(self, coder_id):
# type: (str) -> Tuple[str, str]
coder = self.components.coders[coder_id]
if coder.spec.urn == common_urns.coders.LENGTH_PREFIX.urn:
return coder_id, self.bytes_coder_id
elif coder.spec.urn in self._KNOWN_CODER_URNS:
new_component_ids = [
self.length_prefixed_coder(c) for c in coder.component_coder_ids
self.maybe_length_prefixed_coder(c) for c in coder.component_coder_ids
]
if new_component_ids == coder.component_coder_ids:
new_coder_id = coder_id
Expand Down Expand Up @@ -452,7 +450,7 @@ def length_prefixed_and_safe_coder(self, coder_id):
def length_prefix_pcoll_coders(self, pcoll_id):
# type: (str) -> None
self.components.pcollections[pcoll_id].coder_id = (
self.length_prefixed_coder(
self.maybe_length_prefixed_coder(
self.components.pcollections[pcoll_id].coder_id))


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2653,7 +2653,7 @@ def expand(self, pbegin):
if not isinstance(pbegin, pvalue.PBegin):
raise TypeError(
'Input to Impulse transform must be a PBegin but found %s' % pbegin)
return pvalue.PCollection(pbegin.pipeline)
return pvalue.PCollection(pbegin.pipeline, element_type=bytes)

def get_windowing(self, inputs):
# type: (typing.Any) -> Windowing
Expand Down

0 comments on commit ae36a5d

Please sign in to comment.