Skip to content

Commit

Permalink
Add Impulse primitive to Python SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha authored and robertwb committed Jun 19, 2018
1 parent 21b76f9 commit 4b25f37
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
'WindowInto',
'Flatten',
'Create',
'Impulse',
]


Expand Down Expand Up @@ -1894,3 +1895,27 @@ def _create_source(serialized_values, coder):
from apache_beam.transforms.create_source import _CreateSource

return _CreateSource(serialized_values, coder)

class Impulse(PTransform):
"""Primitive Impulse primitive."""

def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin), (
'Input to Impulse transform must be a PBegin but found %s' % pbegin)
return pvalue.PCollection(pbegin.pipeline)

def get_windowing(self, inputs):
return Windowing(GlobalWindows())

def infer_output_type(self, unused_input_type):
return bytes

def to_runner_api_parameter(self, context):
assert isinstance(self, Impulse), \
"expected instance of Impulse, but got %s" % self.__class__
return (common_urns.IMPULSE_TRANSFORM, None)

@PTransform.register_urn(common_urns.IMPULSE_TRANSFORM, None)
def from_runner_api_parameter(unused_parameter, unused_context):
return Impulse()

0 comments on commit 4b25f37

Please sign in to comment.