Skip to content

Commit

Permalink
[FLINK-19372][python] Support Pandas Batch Over Window Aggregation
Browse files Browse the repository at this point in the history
This closes apache#13475.
  • Loading branch information
HuangXingBo authored and dianfu committed Sep 25, 2020
1 parent 8525c5c commit 749f70d
Show file tree
Hide file tree
Showing 15 changed files with 1,120 additions and 302 deletions.
35 changes: 32 additions & 3 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,23 +578,52 @@ def encode_to_stream(self, iter_cols, out_stream, nested):

def decode_from_stream(self, in_stream, nested):
while in_stream.size() > 0:
yield self._decode_one_batch_from_stream(in_stream)
yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64())

@staticmethod
def _load_from_stream(stream):
reader = pa.ipc.open_stream(stream)
for batch in reader:
yield batch

def _decode_one_batch_from_stream(self, in_stream: create_InputStream) -> List:
self._resettable_io.set_input_bytes(in_stream.read_all(True))
def _decode_one_batch_from_stream(self, in_stream: create_InputStream, size: int) -> List:
self._resettable_io.set_input_bytes(in_stream.read(size))
# there is only one arrow batch in the underlying input stream
return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])

def __repr__(self):
return 'ArrowCoderImpl[%s]' % self._schema


class OverWindowArrowCoderImpl(StreamCoderImpl):
def __init__(self, arrow_coder):
self._arrow_coder = arrow_coder
self._int_coder = IntCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._arrow_coder.encode_to_stream(value, stream, nested)

def decode_from_stream(self, in_stream, nested):
while in_stream.size():
remaining_size = in_stream.read_var_int64()
window_num = self._int_coder.decode_from_stream(in_stream, nested)
remaining_size -= 4
window_boundaries_and_arrow_data = []
for _ in range(window_num):
window_size = self._int_coder.decode_from_stream(in_stream, nested)
remaining_size -= 4
window_boundaries_and_arrow_data.append(
[self._int_coder.decode_from_stream(in_stream, nested)
for _ in range(window_size)])
remaining_size -= 4 * window_size
window_boundaries_and_arrow_data.append(
self._arrow_coder._decode_one_batch_from_stream(in_stream, remaining_size))
yield window_boundaries_and_arrow_data

def __repr__(self):
return 'OverWindowArrowCoderImpl[%s]' % self._arrow_coder


class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
def __init__(self, value_coder):
self._value_coder = value_coder
Expand Down
24 changes: 24 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@ def __repr__(self):
return 'ArrowCoder[%s]' % self._schema


class OverWindowArrowCoder(FastCoder):
"""
Coder for batch pandas over window aggregation.
"""
def __init__(self, arrow_coder):
self._arrow_coder = arrow_coder

def _create_impl(self):
return beam_coder_impl_slow.OverWindowArrowCoderImpl(
self._arrow_coder._create_impl())

def to_type_hint(self):
return typehints.List

@Coder.register_urn(coders.FLINK_OVER_WINDOW_ARROW_CODER_URN, flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
return OverWindowArrowCoder(
ArrowCoder._pickle_from_runner_api_parameter(
schema_proto, unused_components, unused_context))

def __repr__(self):
return 'OverWindowArrowCoder[%s]' % self._arrow_coder


class BeamDataStreamStatelessMapCoder(FastCoder):

def __init__(self, field_coder):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ cdef class DataStreamStatelessFunctionOperation(BeamStatelessFunctionOperation):

cdef class PandasAggregateFunctionOperation(BeamStatelessFunctionOperation):
pass

cdef class PandasBatchOverWindowAggregateFunctionOperation(BeamStatelessFunctionOperation):
cdef list windows
cdef list bounded_range_window_index
cdef list is_bounded_range_window
cdef list window_indexes
cdef list mapper
117 changes: 115 additions & 2 deletions flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ cdef class BeamStatelessFunctionOperation(Operation):
super(BeamStatelessFunctionOperation, self).__init__(name, spec, counter_factory, sampler)
self.consumer = consumers['output'][0]
self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder
from pyflink.fn_execution.beam.beam_coder_impl_slow import ArrowCoderImpl
from pyflink.fn_execution.beam.beam_coder_impl_slow import ArrowCoderImpl, \
OverWindowArrowCoderImpl

if isinstance(self._value_coder_impl, ArrowCoderImpl):
if isinstance(self._value_coder_impl, ArrowCoderImpl) or \
isinstance(self._value_coder_impl, OverWindowArrowCoderImpl):
self._is_python_coder = True
else:
self._is_python_coder = False
Expand Down Expand Up @@ -192,6 +194,108 @@ cdef class PandasAggregateFunctionOperation(BeamStatelessFunctionOperation):
return generate_func, user_defined_funcs


cdef class PandasBatchOverWindowAggregateFunctionOperation(BeamStatelessFunctionOperation):
def __init__(self, name, spec, counter_factory, sampler, consumers):
super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
name, spec, counter_factory, sampler, consumers)
self.windows = [window for window in self.spec.serialized_fn.windows]
self.bounded_range_window_index = [-1 for _ in range(len(self.windows))]
self.is_bounded_range_window = []
window_types = flink_fn_execution_pb2.OverWindow
bounded_range_window_nums = 0
for i, window in enumerate(self.windows):
window_type = window.window_type
if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
window_type is window_types.RANGE_SLIDING):
self.bounded_range_window_index[i] = bounded_range_window_nums
self.is_bounded_range_window.append(True)
bounded_range_window_nums += 1
else:
self.is_bounded_range_window.append(False)

def generate_func(self, udfs):
user_defined_funcs = []
self.window_indexes = []
self.mapper = []
for udf in udfs:
pandas_agg_function, variable_dict, user_defined_func, window_index = \
operation_utils.extract_over_window_user_defined_function(udf)
user_defined_funcs.extend(user_defined_func)
self.window_indexes.append(window_index)
self.mapper.append(eval('lambda value: %s' % pandas_agg_function, variable_dict))
return self.wrapped_over_window_function, user_defined_funcs

def wrapped_over_window_function(self, it):
import pandas as pd
OverWindow = flink_fn_execution_pb2.OverWindow
for boundaries_series in it:
input_series = boundaries_series[len(boundaries_series) - 1]
input_cnt = len(input_series[0])
results = []
# loop every agg func
for i in range(len(self.window_indexes)):
window_index = self.window_indexes[i]
window = self.windows[window_index]
window_type = window.window_type
func = self.mapper[i]
result = []
if self.is_bounded_range_window[window_index]:
window_boundaries = boundaries_series[
self.bounded_range_window_index[window_index]]
if window_type is OverWindow.RANGE_UNBOUNDED_PRECEDING:
# range unbounded preceding window
for j in range(input_cnt):
end = window_boundaries[j]
series_slices = [s.iloc[:end] for s in input_series]
result.append(func(series_slices))
elif window_type is OverWindow.RANGE_UNBOUNDED_FOLLOWING:
# range unbounded following window
for j in range(input_cnt):
start = window_boundaries[j]
series_slices = [s.iloc[start:] for s in input_series]
result.append(func(series_slices))
else:
# range sliding window
for j in range(input_cnt):
start = window_boundaries[j * 2]
end = window_boundaries[j * 2 + 1]
series_slices = [s.iloc[start:end] for s in input_series]
result.append(func(series_slices))
else:
# unbounded range window or unbounded row window
if (window_type is OverWindow.RANGE_UNBOUNDED) or (
window_type is OverWindow.ROW_UNBOUNDED):
series_slices = [s.iloc[:] for s in input_series]
func_result = func(series_slices)
result = [func_result for _ in range(input_cnt)]
elif window_type is OverWindow.ROW_UNBOUNDED_PRECEDING:
# row unbounded preceding window
window_end = window.upper_boundary
for j in range(input_cnt):
end = min(j + window_end + 1, input_cnt)
series_slices = [s.iloc[: end] for s in input_series]
result.append(func(series_slices))
elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
# row unbounded following window
window_start = window.lower_boundary
for j in range(input_cnt):
start = max(j + window_start, 0)
series_slices = [s.iloc[start: input_cnt] for s in input_series]
result.append(func(series_slices))
else:
# row sliding window
window_start = window.lower_boundary
window_end = window.upper_boundary
for j in range(input_cnt):
start = max(j + window_start, 0)
end = min(j + window_end + 1, input_cnt)
series_slices = [s.iloc[start: end] for s in input_series]
result.append(func(series_slices))
results.append(pd.Series(result))
yield results


@bundle_processor.BeamTransformFactory.register_urn(
operation_utils.SCALAR_FUNCTION_URN, flink_fn_execution_pb2.UserDefinedFunctions)
def create_scalar_function(factory, transform_id, transform_proto, parameter, consumers):
Expand All @@ -216,6 +320,15 @@ def create_pandas_aggregate_function(factory, transform_id, transform_proto, par
return _create_user_defined_function_operation(
factory, transform_proto, consumers, parameter, PandasAggregateFunctionOperation)

@bundle_processor.BeamTransformFactory.register_urn(
operation_utils.PANDAS_BATCH_OVER_WINDOW_AGGREGATE_FUNCTION_URN,
flink_fn_execution_pb2.UserDefinedFunctions)
def create_pandas_over_window_aggregate_function(
factory, transform_id, transform_proto, parameter, consumers):
return _create_user_defined_function_operation(
factory, transform_proto, consumers, parameter,
PandasBatchOverWindowAggregateFunctionOperation)

def _create_user_defined_function_operation(factory, transform_proto, consumers, udfs_proto,
operation_cls):
output_tags = list(transform_proto.outputs.keys())
Expand Down
116 changes: 116 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,112 @@ def generate_func(self, udfs):
return lambda it: map(mapper, it), user_defined_funcs


class PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
def __init__(self, name, spec, counter_factory, sampler, consumers):
super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
name, spec, counter_factory, sampler, consumers)
self.windows = [window for window in self.spec.serialized_fn.windows]
# the index among all the bounded range over window
self.bounded_range_window_index = [-1 for _ in range(len(self.windows))]
# Whether the specified position window is a bounded range window.
self.is_bounded_range_window = []
window_types = flink_fn_execution_pb2.OverWindow

bounded_range_window_nums = 0
for i, window in enumerate(self.windows):
window_type = window.window_type
if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
window_type is window_types.RANGE_SLIDING):
self.bounded_range_window_index[i] = bounded_range_window_nums
self.is_bounded_range_window.append(True)
bounded_range_window_nums += 1
else:
self.is_bounded_range_window.append(False)

def generate_func(self, udfs):
user_defined_funcs = []
self.window_indexes = []
self.mapper = []
for udf in udfs:
pandas_agg_function, variable_dict, user_defined_func, window_index = \
operation_utils.extract_over_window_user_defined_function(udf)
user_defined_funcs.extend(user_defined_func)
self.window_indexes.append(window_index)
self.mapper.append(eval('lambda value: %s' % pandas_agg_function, variable_dict))
return self.wrapped_over_window_function, user_defined_funcs

def wrapped_over_window_function(self, it):
import pandas as pd
OverWindow = flink_fn_execution_pb2.OverWindow
for boundaries_series in it:
input_series = boundaries_series[-1]
# the row number of the arrow format data
input_cnt = len(input_series[0])
results = []
# loop every agg func
for i in range(len(self.window_indexes)):
window_index = self.window_indexes[i]
# the over window which the agg function belongs to
window = self.windows[window_index]
window_type = window.window_type
func = self.mapper[i]
result = []
if self.is_bounded_range_window[window_index]:
window_boundaries = boundaries_series[
self.bounded_range_window_index[window_index]]
if window_type is OverWindow.RANGE_UNBOUNDED_PRECEDING:
# range unbounded preceding window
for j in range(input_cnt):
end = window_boundaries[j]
series_slices = [s.iloc[:end] for s in input_series]
result.append(func(series_slices))
elif window_type is OverWindow.RANGE_UNBOUNDED_FOLLOWING:
# range unbounded following window
for j in range(input_cnt):
start = window_boundaries[j]
series_slices = [s.iloc[start:] for s in input_series]
result.append(func(series_slices))
else:
# range sliding window
for j in range(input_cnt):
start = window_boundaries[j * 2]
end = window_boundaries[j * 2 + 1]
series_slices = [s.iloc[start:end] for s in input_series]
result.append(func(series_slices))
else:
# unbounded range window or unbounded row window
if (window_type is OverWindow.RANGE_UNBOUNDED) or (
window_type is OverWindow.ROW_UNBOUNDED):
series_slices = [s.iloc[:] for s in input_series]
func_result = func(series_slices)
result = [func_result for _ in range(input_cnt)]
elif window_type is OverWindow.ROW_UNBOUNDED_PRECEDING:
# row unbounded preceding window
window_end = window.upper_boundary
for j in range(input_cnt):
end = min(j + window_end + 1, input_cnt)
series_slices = [s.iloc[: end] for s in input_series]
result.append(func(series_slices))
elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
# row unbounded following window
window_start = window.lower_boundary
for j in range(input_cnt):
start = max(j + window_start, 0)
series_slices = [s.iloc[start: input_cnt] for s in input_series]
result.append(func(series_slices))
else:
# row sliding window
window_start = window.lower_boundary
window_end = window.upper_boundary
for j in range(input_cnt):
start = max(j + window_start, 0)
end = min(j + window_end + 1, input_cnt)
series_slices = [s.iloc[start: end] for s in input_series]
result.append(func(series_slices))
results.append(pd.Series(result))
yield results

@bundle_processor.BeamTransformFactory.register_urn(
operation_utils.SCALAR_FUNCTION_URN, flink_fn_execution_pb2.UserDefinedFunctions)
def create_scalar_function(factory, transform_id, transform_proto, parameter, consumers):
Expand Down Expand Up @@ -201,6 +307,16 @@ def create_pandas_aggregate_function(factory, transform_id, transform_proto, par
factory, transform_proto, consumers, parameter, PandasAggregateFunctionOperation)


@bundle_processor.BeamTransformFactory.register_urn(
operation_utils.PANDAS_BATCH_OVER_WINDOW_AGGREGATE_FUNCTION_URN,
flink_fn_execution_pb2.UserDefinedFunctions)
def create_pandas_over_window_aggregate_function(
factory, transform_id, transform_proto, parameter, consumers):
return _create_user_defined_function_operation(
factory, transform_proto, consumers, parameter,
PandasBatchOverWindowAggregateFunctionOperation)


def _create_user_defined_function_operation(factory, transform_proto, consumers, udfs_proto,
operation_cls):
output_tags = list(transform_proto.outputs.keys())
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/fn_execution/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
FLINK_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1"
FLINK_MAP_FUNCTION_DATA_STREAM_CODER_URN = "flink:coder:datastream:map_function:v1"
FLINK_FLAT_MAP_FUNCTION_DATA_STREAM_CODER_URN = "flink:coder:datastream:flatmap_function:v1"
FLINK_OVER_WINDOW_ARROW_CODER_URN = "flink:coder:schema:batch_over_window:arrow:v1"


class BaseCoder(ABC):
Expand Down
7 changes: 7 additions & 0 deletions flink-python/pyflink/fn_execution/operation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
DATA_STREAM_STATELESS_FUNCTION_URN = "flink:transform:datastream_stateless_function:v1"
PANDAS_AGGREGATE_FUNCTION_URN = "flink:transform:aggregate_function:arrow:v1"
PANDAS_BATCH_OVER_WINDOW_AGGREGATE_FUNCTION_URN = \
"flink:transform:batch_over_window_aggregate_function:arrow:v1"

_func_num = 0
_constant_num = 0
Expand All @@ -39,6 +41,11 @@ def wrap_pandas_result(it):
return [pd.Series([result]) for result in it]


def extract_over_window_user_defined_function(user_defined_function_proto):
window_index = user_defined_function_proto.window_index
return (*extract_user_defined_function(user_defined_function_proto, True), window_index)


def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False)\
-> Tuple[str, Dict, List]:
"""
Expand Down
Loading

0 comments on commit 749f70d

Please sign in to comment.