Skip to content

Commit

Permalink
[FLINK-23400][python] Support to decode a single record for the Pytho…
Browse files Browse the repository at this point in the history
…n coder

This closes apache#16508.
  • Loading branch information
dianfu committed Jul 16, 2021
1 parent c225a8c commit 41d3e22
Show file tree
Hide file tree
Showing 49 changed files with 1,659 additions and 1,373 deletions.
10 changes: 3 additions & 7 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@ from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
from apache_beam.coders.coder_impl cimport StreamCoderImpl

from pyflink.fn_execution.coder_impl_fast cimport LengthPrefixBaseCoderImpl, FieldCoderImpl
from pyflink.fn_execution.stream_fast cimport LengthPrefixInputStream, OutputStream
from pyflink.fn_execution.stream_fast cimport OutputStream

cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
cdef readonly StreamCoderImpl _value_coder

cdef class PassThroughPrefixCoderImpl(StreamCoderImpl):
cdef class FlinkFieldCoderBeamWrapper(StreamCoderImpl):
cdef FieldCoderImpl _value_coder
cdef OutputStream _data_out_stream

cdef void _write_data_output_stream(self, BOutputStream out_stream)

cdef class BeamCoderImpl(StreamCoderImpl):
cdef class FlinkLengthPrefixCoderBeamWrapper(StreamCoderImpl):
cdef readonly LengthPrefixBaseCoderImpl _value_coder

cdef class InputStreamWrapper:
cdef LengthPrefixBaseCoderImpl _value_coder
cdef LengthPrefixInputStream _input_stream
20 changes: 9 additions & 11 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
cpdef get_estimated_size_and_observables(self, value, bint nested=False):
return 0, []

cdef class PassThroughPrefixCoderImpl(StreamCoderImpl):
cdef class FlinkFieldCoderBeamWrapper(StreamCoderImpl):
"""
Bridge between Beam coder and Flink coder for the low-level FieldCoder.
"""
def __cinit__(self, value_coder):
self._value_coder = value_coder
self._data_out_stream = OutputStream()
Expand Down Expand Up @@ -84,8 +87,10 @@ cdef class PassThroughPrefixCoderImpl(StreamCoderImpl):
self._data_out_stream.pos = 0



cdef class BeamCoderImpl(StreamCoderImpl):
cdef class FlinkLengthPrefixCoderBeamWrapper(StreamCoderImpl):
"""
Bridge between Beam coder and Flink coder for the top-level LengthPrefixCoder.
"""
def __cinit__(self, value_coder):
self._value_coder = value_coder

Expand All @@ -94,11 +99,4 @@ cdef class BeamCoderImpl(StreamCoderImpl):

cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
cdef BeamInputStream input_stream = BeamInputStream(in_stream, in_stream.size())
cdef InputStreamWrapper input_stream_wrapper = InputStreamWrapper(self._value_coder,
input_stream)
return input_stream_wrapper

cdef class InputStreamWrapper:
def __cinit__(self, value_coder, input_stream):
self._value_coder = value_coder
self._input_stream = input_stream
return self._value_coder.decode_from_stream(input_stream)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def __repr__(self):
return 'PassThroughLengthPrefixCoderImpl[%s]' % self._value_coder


class BeamCoderImpl(StreamCoderImpl):
class FlinkCoderBeamWrapper(StreamCoderImpl):
"""
Bridge between Beam coder and Flink coder.
"""
def __init__(self, value_coder):
self._value_coder = value_coder
self._data_output_stream = OutputStream()
Expand All @@ -55,4 +58,4 @@ def decode_from_stream(self, in_stream: create_InputStream, nested):
return self._value_coder.decode_from_stream(data_input_stream)

def __repr__(self):
return 'BeamCoderImpl[%s]' % self._value_coder
return 'FlinkCoderBeamWrapper[%s]' % self._value_coder
27 changes: 16 additions & 11 deletions flink-python/pyflink/fn_execution/beam/beam_coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
from apache_beam.portability import common_urns
from apache_beam.typehints import typehints

from pyflink.fn_execution.coders import ArrowCoder, OverWindowArrowCoder, LengthPrefixBaseCoder
from pyflink.fn_execution.flink_fn_execution_pb2 import CoderParam
from pyflink.fn_execution.coders import (
ArrowCoder, OverWindowArrowCoder, LengthPrefixBaseCoder)
from pyflink.fn_execution.flink_fn_execution_pb2 import CoderInfoDescriptor

try:
from pyflink.fn_execution.beam import beam_coder_impl_fast as beam_coder_impl
from pyflink.fn_execution.beam.beam_coder_impl_fast import PassThroughPrefixCoderImpl
from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkFieldCoderBeamWrapper
from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper
except ImportError:
from pyflink.fn_execution.beam import beam_coder_impl_slow as beam_coder_impl
PassThroughPrefixCoderImpl = beam_coder_impl.BeamCoderImpl
FlinkFieldCoderBeamWrapper = beam_coder_impl.FlinkCoderBeamWrapper
FlinkLengthPrefixCoderBeamWrapper = beam_coder_impl.FlinkCoderBeamWrapper

FLINK_CODER_URN = "flink:coder:v1"

Expand Down Expand Up @@ -64,19 +67,21 @@ def _create_impl(self):
def get_impl(self):
if isinstance(self._internal_coder, LengthPrefixBaseCoder):
if isinstance(self._internal_coder._field_coder, (ArrowCoder, OverWindowArrowCoder)):
from pyflink.fn_execution.beam.beam_coder_impl_slow import BeamCoderImpl
return BeamCoderImpl(self._create_impl())
from pyflink.fn_execution.beam.beam_coder_impl_slow import FlinkCoderBeamWrapper
return FlinkCoderBeamWrapper(self._create_impl())
else:
return beam_coder_impl.BeamCoderImpl(self._create_impl())
return FlinkLengthPrefixCoderBeamWrapper(self._create_impl())
else:
return PassThroughPrefixCoderImpl(self._create_impl())
return FlinkFieldCoderBeamWrapper(self._create_impl())

def to_type_hint(self):
return typehints.Any

@Coder.register_urn(FLINK_CODER_URN, CoderParam)
def _pickle_from_runner_api_parameter(coder_praram_proto, unused_components, unused_context):
return FlinkCoder(LengthPrefixBaseCoder.from_coder_param_proto(coder_praram_proto))
@Coder.register_urn(FLINK_CODER_URN, CoderInfoDescriptor)
def _pickle_from_runner_api_parameter(
coder_info_descriptor_proto, unused_components, unused_context):
return FlinkCoder(LengthPrefixBaseCoder.from_coder_info_descriptor_proto(
coder_info_descriptor_proto))

def __repr__(self):
return 'FlinkCoder[%s]' % repr(self._internal_coder)
Expand Down
22 changes: 8 additions & 14 deletions flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from libc.stdint cimport *
from apache_beam.utils.windowed_value cimport WindowedValue

from pyflink.fn_execution.coder_impl_fast cimport LengthPrefixBaseCoderImpl
from pyflink.fn_execution.beam.beam_stream_fast cimport BeamInputStream, BeamOutputStream
from pyflink.fn_execution.beam.beam_coder_impl_fast cimport InputStreamWrapper, BeamCoderImpl
from pyflink.fn_execution.beam.beam_stream_fast cimport BeamOutputStream
from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper
from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper
from pyflink.fn_execution.table.operations import BundleOperation

cdef class FunctionOperation(Operation):
Expand All @@ -38,7 +38,7 @@ cdef class FunctionOperation(Operation):
self.consumer = consumers['output'][0]
self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder

if isinstance(self._value_coder_impl, BeamCoderImpl):
if isinstance(self._value_coder_impl, FlinkLengthPrefixCoderBeamWrapper):
self._is_python_coder = False
self._output_coder = self._value_coder_impl._value_coder
else:
Expand All @@ -64,8 +64,6 @@ cdef class FunctionOperation(Operation):

cpdef process(self, WindowedValue o):
cdef InputStreamWrapper input_stream_wrapper
cdef BeamInputStream input_stream
cdef LengthPrefixBaseCoderImpl input_coder
cdef BeamOutputStream output_stream
with self.scoped_process_state:
if self._is_python_coder:
Expand All @@ -75,19 +73,15 @@ cdef class FunctionOperation(Operation):
self.consumer.output_stream.maybe_flush()
else:
input_stream_wrapper = o.value
input_stream = input_stream_wrapper._input_stream
input_coder = input_stream_wrapper._value_coder
output_stream = BeamOutputStream(self.consumer.output_stream)
if isinstance(self.operation, BundleOperation):
while input_stream.available():
input_data = input_coder.decode_from_stream(input_stream)
self.process_element(input_data)
while input_stream_wrapper.has_next():
self.process_element(input_stream_wrapper.next())
result = self.operation.finish_bundle()
self._output_coder.encode_to_stream(result, output_stream)
else:
while input_stream.available():
input_data = input_coder.decode_from_stream(input_stream)
result = self.process_element(input_data)
while input_stream_wrapper.has_next():
result = self.process_element(input_stream_wrapper.next())
self._output_coder.encode_to_stream(result, output_stream)
output_stream.flush()

Expand Down
9 changes: 8 additions & 1 deletion flink-python/pyflink/fn_execution/coder_impl_fast.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ cdef class FieldCoderImpl:
cpdef bytes encode(self, value)
cpdef decode(self, encoded)

cdef class InputStreamWrapper:
cdef ValueCoderImpl _value_coder
cdef LengthPrefixInputStream _input_stream

cpdef bint has_next(self)
cpdef next(self)

cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl):
cdef char*_end_message
cdef bint _writes_end_message
cdef bint _separated_with_end_message

cdef class ValueCoderImpl(LengthPrefixBaseCoderImpl):
pass
Expand Down
52 changes: 33 additions & 19 deletions flink-python/pyflink/fn_execution/coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,7 @@ cdef class LengthPrefixBaseCoderImpl:
pass

cpdef decode_from_stream(self, LengthPrefixInputStream input_stream):
cdef char*input_data
cdef char*temp
cdef size_t size
cdef long long addr
cdef InputStream data_input_stream
# set input_data pointer to the input data
size = input_stream.read(&input_data)

# create InputStream
data_input_stream = InputStream()
data_input_stream._input_data = input_data

return self._field_coder.decode_from_stream(data_input_stream, size)
pass

cdef void _write_data_to_output_stream(self, LengthPrefixOutputStream output_stream):
cdef OutputStream data_out_stream
Expand Down Expand Up @@ -247,6 +235,17 @@ cdef class FieldCoderImpl:
input_stream._input_data = <char*> encoded
return self.decode_from_stream(input_stream, len(encoded))

cdef class InputStreamWrapper:
def __cinit__(self, value_coder: ValueCoderImpl, input_stream: LengthPrefixInputStream):
self._value_coder = value_coder
self._input_stream = input_stream

cpdef bint has_next(self):
return self._input_stream.available()

cpdef next(self):
return self._value_coder.decode_from_stream(self._input_stream)

cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl):
"""
Encodes iterable data to output stream. The output mode will decide whether write a special end
Expand All @@ -259,22 +258,22 @@ cdef class IterableCoderImpl(LengthPrefixBaseCoderImpl):
raise MemoryError()
self._end_message[0] = 0x00

def __init__(self, field_coder: FieldCoderImpl, output_mode):
def __init__(self, field_coder: FieldCoderImpl, separated_with_end_message: bool):
super(IterableCoderImpl, self).__init__(field_coder)
if output_mode == flink_fn_execution_pb2.CoderParam.MULTIPLE:
self._writes_end_message = False
else:
self._writes_end_message = True
self._separated_with_end_message = separated_with_end_message

cpdef encode_to_stream(self, value, LengthPrefixOutputStream output_stream):
for item in value:
self._field_coder.encode_to_stream(item, self._data_out_stream)
self._write_data_to_output_stream(output_stream)

# write end message
if self._writes_end_message:
if self._separated_with_end_message:
output_stream.write(self._end_message, 1)

cpdef decode_from_stream(self, LengthPrefixInputStream input_stream):
return InputStreamWrapper(ValueCoderImpl(self._field_coder), input_stream)

def __dealloc__(self):
if self._end_message != NULL:
free(self._end_message)
Expand All @@ -291,6 +290,21 @@ cdef class ValueCoderImpl(LengthPrefixBaseCoderImpl):
self._field_coder.encode_to_stream(value, self._data_out_stream)
self._write_data_to_output_stream(output_stream)

cpdef decode_from_stream(self, LengthPrefixInputStream input_stream):
cdef char*input_data
cdef char*temp
cdef size_t size
cdef long long addr
cdef InputStream data_input_stream
# set input_data pointer to the input data
size = input_stream.read(&input_data)

# create InputStream
data_input_stream = InputStream()
data_input_stream._input_data = input_data

return self._field_coder.decode_from_stream(data_input_stream, size)

cdef class FlattenRowCoderImpl(FieldCoderImpl):
"""
A coder for flatten row (List) object (without field names and row kind value is 0).
Expand Down
18 changes: 10 additions & 8 deletions flink-python/pyflink/fn_execution/coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from pyflink.common import Row, RowKind
from pyflink.datastream.window import TimeWindow, CountWindow
from pyflink.fn_execution.ResettableIO import ResettableIO
from pyflink.fn_execution.flink_fn_execution_pb2 import CoderParam
from pyflink.fn_execution.stream_slow import InputStream, OutputStream
from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas

Expand All @@ -44,10 +43,6 @@ def __init__(self, field_coder: 'FieldCoderImpl'):
self._field_coder = field_coder
self._data_out_stream = OutputStream()

def decode_from_stream(self, in_stream: InputStream):
while in_stream.size() > 0:
yield self._field_coder.decode_from_stream(in_stream, in_stream.read_var_int64())

def _write_data_to_output_stream(self, out_stream: OutputStream):
out_stream.write_var_int64(self._data_out_stream.size())
out_stream.write(self._data_out_stream.get())
Expand Down Expand Up @@ -92,20 +87,24 @@ class IterableCoderImpl(LengthPrefixBaseCoderImpl):
message 0x00 to output stream after encoding data.
"""

def __init__(self, field_coder: 'FieldCoderImpl', output_mode):
def __init__(self, field_coder: 'FieldCoderImpl', separated_with_end_message: bool):
super(IterableCoderImpl, self).__init__(field_coder)
self._output_mode = output_mode
self._separated_with_end_message = separated_with_end_message

def encode_to_stream(self, value: List, out_stream):
for item in value:
self._field_coder.encode_to_stream(item, self._data_out_stream)
self._write_data_to_output_stream(out_stream)

# write end message
if self._output_mode == CoderParam.MULTIPLE_WITH_END:
if self._separated_with_end_message:
out_stream.write_var_int64(1)
out_stream.write_byte(0x00)

def decode_from_stream(self, in_stream: InputStream):
while in_stream.size() > 0:
yield self._field_coder.decode_from_stream(in_stream, in_stream.read_var_int64())


class ValueCoderImpl(LengthPrefixBaseCoderImpl):
"""
Expand All @@ -119,6 +118,9 @@ def encode_to_stream(self, value, out_stream):
self._field_coder.encode_to_stream(value, self._data_out_stream)
self._write_data_to_output_stream(out_stream)

def decode_from_stream(self, in_stream: InputStream):
return self._field_coder.decode_from_stream(in_stream, in_stream.read_var_int64())


class MaskUtils:
"""
Expand Down
Loading

0 comments on commit 41d3e22

Please sign in to comment.