Skip to content

Commit

Permalink
[FLINK-17119][python] Add Cython support for composite types (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangXingBo committed Apr 20, 2020
1 parent 3cd8716 commit 5456360
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 0 deletions.
25 changes: 25 additions & 0 deletions flink-python/pyflink/fn_execution/fast_coder_impl.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):

# encode data to output_stream
cdef void _encode_one_row(self, value)
cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
item)
cdef void _encode_field_simple(self, TypeName field_type, item)
cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item)
cdef void _extend(self, size_t missing)
cdef void _encode_byte(self, unsigned char val)
cdef void _encode_smallint(self, libc.stdint.int16_t v)
Expand All @@ -117,7 +120,9 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):

# decode data from input_stream
cdef void _decode_next_row(self)
cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder)
cdef object _decode_field_simple(self, TypeName field_type)
cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder)
cdef unsigned char _decode_byte(self) except? -1
cdef libc.stdint.int16_t _decode_smallint(self) except? -1
cdef libc.stdint.int32_t _decode_int(self) except? -1
Expand Down Expand Up @@ -190,3 +195,23 @@ cdef class DateCoderImpl(BaseCoder):

cdef class TimeCoderImpl(BaseCoder):
pass

cdef class DecimalCoderImpl(BaseCoder):
cdef readonly object context
cdef readonly object scale_format

cdef class TimestampCoderImpl(BaseCoder):
cdef readonly bint is_compact

cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cdef readonly object timezone

cdef class ArrayCoderImpl(BaseCoder):
cdef readonly BaseCoder elem_coder

cdef class MapCoderImpl(BaseCoder):
cdef readonly BaseCoder key_coder
cdef readonly BaseCoder value_coder

cdef class RowCoderImpl(BaseCoder):
cdef readonly list field_coders
246 changes: 246 additions & 0 deletions flink-python/pyflink/fn_execution/fast_coder_impl.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ cimport libc.stdlib
from libc.string cimport strlen

import datetime
import decimal
from pyflink.table import Row

cdef class InputStreamAndFunctionWrapper:
def __cinit__(self, func, input_stream_wrapper):
Expand Down Expand Up @@ -143,6 +145,9 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
if item is not None:
if self._output_coder_type[i] == SIMPLE:
self._encode_field_simple(self._output_field_type[i], item)
else:
self._encode_field_complex(self._output_field_type[i],
self._output_field_coders[i], item)

self._copy_to_output_buffer()

Expand Down Expand Up @@ -180,6 +185,15 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
else:
if self._input_coder_type[i] == SIMPLE:
self.row[i] = self._decode_field_simple(self._input_field_type[i])
else:
self.row[i] = self._decode_field_complex(self._input_field_type[i],
self._input_field_coders[i])

cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder):
if coder_type == SIMPLE:
return self._decode_field_simple(field_type)
else:
return self._decode_field_complex(field_type, field_coder)

cdef object _decode_field_simple(self, TypeName field_type):
cdef libc.stdint.int32_t value, minutes, seconds, hours
Expand Down Expand Up @@ -227,6 +241,89 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
minutes %= 60
return datetime.time(hours, minutes, seconds, milliseconds * 1000)

cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder):
cdef libc.stdint.int32_t nanoseconds, microseconds, seconds, length
cdef libc.stdint.int32_t i, row_field_count, leading_complete_bytes_num, remaining_bits_num
cdef libc.stdint.int64_t milliseconds
cdef bint*null_mask
cdef BaseCoder value_coder, key_coder
cdef TypeName value_type, key_type
cdef CoderType value_coder_type, key_coder_type
cdef list row_field_coders

if field_type == DECIMAL:
# decimal
user_context = decimal.getcontext()
decimal.setcontext((<DecimalCoderImpl> field_coder).context)
result = decimal.Decimal((self._decode_bytes()).decode("utf-8")).quantize(
(<DecimalCoderImpl> field_coder).scale_format)
decimal.setcontext(user_context)
return result
elif field_type == TIMESTAMP:
# Timestamp
if (<TimeCoderImpl> field_coder).is_compact:
milliseconds = self._decode_bigint()
nanoseconds = 0
else:
milliseconds = self._decode_bigint()
nanoseconds = self._decode_int()
seconds, microseconds = (milliseconds // 1000,
milliseconds % 1000 * 1000 + nanoseconds // 1000)
return datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=microseconds)
elif field_type == LOCAL_ZONED_TIMESTAMP:
# LOCAL_ZONED_TIMESTAMP
if (<LocalZonedTimestampCoderImpl> field_coder).is_compact:
milliseconds = self._decode_bigint()
nanoseconds = 0
else:
milliseconds = self._decode_bigint()
nanoseconds = self._decode_int()
seconds, microseconds = (milliseconds // 1000,
milliseconds % 1000 * 1000 + nanoseconds // 1000)
return (<LocalZonedTimestampCoderImpl> field_coder).timezone.localize(
datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=microseconds))
elif field_type == ARRAY:
# Array
length = self._decode_int()
value_coder = (<ArrayCoderImpl> field_coder).elem_coder
value_type = value_coder.type_name()
value_coder_type = value_coder.coder_type()
return [self._decode_field(value_coder_type, value_type, value_coder) if self._decode_byte()
else None for _ in range(length)]
elif field_type == MAP:
# Map
key_coder = (<MapCoderImpl> field_coder).key_coder
key_type = key_coder.type_name()
key_coder_type = key_coder.coder_type()
value_coder = (<MapCoderImpl> field_coder).value_coder
value_type = value_coder.type_name()
value_coder_type = value_coder.coder_type()
length = self._decode_int()
map_value = {}
for _ in range(length):
key = self._decode_field(key_coder_type, key_type, key_coder)
if self._decode_byte():
map_value[key] = None
else:
map_value[key] = self._decode_field(value_coder_type, value_type, value_coder)
return map_value
elif field_type == ROW:
# Row
row_field_coders = (<RowCoderImpl> field_coder).field_coders
row_field_count = len(row_field_coders)
null_mask = <bint*> libc.stdlib.malloc(row_field_count * sizeof(bint))
leading_complete_bytes_num = row_field_count // 8
remaining_bits_num = row_field_count % 8
self._read_null_mask(null_mask, leading_complete_bytes_num, remaining_bits_num)
row = Row(*[None if null_mask[i] else
self._decode_field(
row_field_coders[i].coder_type(),
row_field_coders[i].type_name(),
row_field_coders[i])
for i in range(row_field_count)])
libc.stdlib.free(null_mask)
return row

cdef unsigned char _decode_byte(self) except? -1:
self._input_pos += 1
return <unsigned char> self._input_data[self._input_pos - 1]
Expand Down Expand Up @@ -297,6 +394,13 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
self.row = [None for _ in range(self._input_field_count)]
self.func = input_stream_and_function_wrapper.func

cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
item):
if coder_type == SIMPLE:
self._encode_field_simple(field_type, item)
else:
self._encode_field_complex(field_type, field_coder, item)

cdef void _encode_field_simple(self, TypeName field_type, item):
cdef libc.stdint.int32_t hour, minute, seconds, microsecond, milliseconds
if field_type == TINYINT:
Expand Down Expand Up @@ -340,6 +444,85 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000
self._encode_int(milliseconds)

cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item):
cdef libc.stdint.int32_t nanoseconds, microseconds_of_second, length, row_field_count
cdef libc.stdint.int32_t leading_complete_bytes_num, remaining_bits_num
cdef libc.stdint.int64_t timestamp_milliseconds, timestamp_seconds
cdef BaseCoder value_coder, key_coder
cdef TypeName value_type, key_type
cdef CoderType value_coder_type, key_coder_type
cdef BaseCoder row_field_coder
cdef list row_field_coders, row_value

if field_type == DECIMAL:
# decimal
user_context = decimal.getcontext()
decimal.setcontext((<DecimalCoderImpl> field_coder).context)
bytes_value = str(item.quantize((<DecimalCoderImpl> field_coder).scale_format)).encode(
"utf-8")
self._encode_bytes(bytes_value)
decimal.setcontext(user_context)
elif field_type == TIMESTAMP or field_type == LOCAL_ZONED_TIMESTAMP:
# Timestamp
timestamp_seconds = <libc.stdint.int64_t> (
item.replace(tzinfo=datetime.timezone.utc).timestamp())
microseconds_of_second = item.microsecond
timestamp_milliseconds = timestamp_seconds * 1000 + microseconds_of_second // 1000
nanoseconds = microseconds_of_second % 1000 * 1000
if field_coder.is_compact:
self._encode_bigint(timestamp_milliseconds)
else:
self._encode_bigint(timestamp_milliseconds)
self._encode_int(nanoseconds)
elif field_type == ARRAY:
# Array
length = len(item)
value_coder = (<ArrayCoderImpl> field_coder).elem_coder
value_type = value_coder.type_name()
value_coder_type = value_coder.coder_type()
self._encode_int(length)
for i in range(length):
value = item[i]
if value is None:
self._encode_byte(False)
else:
self._encode_byte(True)
self._encode_field(value_coder_type, value_type, value_coder, value)
elif field_type == MAP:
# Map
length = len(item)
self._encode_int(length)
iter_items = item.items()
key_coder = (<MapCoderImpl> field_coder).key_coder
key_type = key_coder.type_name()
key_coder_type = key_coder.coder_type()
value_coder = (<MapCoderImpl> field_coder).value_coder
value_type = value_coder.type_name()
value_coder_type = value_coder.coder_type()
for iter_item in iter_items:
key = iter_item[0]
value = iter_item[1]
self._encode_field(key_coder_type, key_type, key_coder, key)
if value is None:
self._encode_byte(True)
else:
self._encode_byte(False)
self._encode_field(value_coder_type, value_type, value_coder, value)
elif field_type == ROW:
# Row
row_field_coders = (<RowCoderImpl> field_coder).field_coders
row_field_count = len(row_field_coders)
leading_complete_bytes_num = row_field_count // 8
remaining_bits_num = row_field_count % 8
row_value = list(item)
self._write_null_mask(row_value, leading_complete_bytes_num, remaining_bits_num)
for i in range(row_field_count):
field_item = row_value[i]
row_field_coder = row_field_coders[i]
if field_item is not None:
self._encode_field(row_field_coder.coder_type(), row_field_coder.type_name(),
row_field_coder, field_item)

cdef void _copy_to_output_buffer(self):
cdef size_t size
cdef size_t i
Expand Down Expand Up @@ -551,3 +734,66 @@ cdef class TimeCoderImpl(BaseCoder):

cpdef TypeName type_name(self):
return TIME

cdef class DecimalCoderImpl(BaseCoder):
def __cinit__(self, precision, scale):
self.context = decimal.Context(prec=precision)
self.scale_format = decimal.Decimal(10) ** -scale

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return DECIMAL

cdef class TimestampCoderImpl(BaseCoder):
def __init__(self, precision):
self.is_compact = precision <= 3

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return TIMESTAMP

cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
def __init__(self, precision, timezone):
super(LocalZonedTimestampCoderImpl, self).__init__(precision)
self.timezone = timezone

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return LOCAL_ZONED_TIMESTAMP

cdef class ArrayCoderImpl(BaseCoder):
def __cinit__(self, elem_coder):
self.elem_coder = elem_coder

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return ARRAY

cdef class MapCoderImpl(BaseCoder):
def __cinit__(self, key_coder, value_coder):
self.key_coder = key_coder
self.value_coder = value_coder

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return MAP

cdef class RowCoderImpl(BaseCoder):
def __cinit__(self, field_coders):
self.field_coders = field_coders

cpdef CoderType coder_type(self):
return COMPLEX

cpdef TypeName type_name(self):
return ROW
Loading

0 comments on commit 5456360

Please sign in to comment.