Skip to content

Commit

Permalink
[FLINK-30366][python] Fix Python Group Agg failed in cleaning the idl…
Browse files Browse the repository at this point in the history
…e state

This closes apache#21488.
  • Loading branch information
HuangXingBo committed Dec 12, 2022
1 parent cfc2ec8 commit 72a7031
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 4 deletions.
2 changes: 1 addition & 1 deletion flink-python/pyflink/fn_execution/coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ cdef class FlattenRowCoderImpl(FieldCoderImpl):
cdef size_t i
cdef FieldCoderImpl field_coder

list_value = <list> value
list_value = <list?> value

# encode mask value
self._mask_utils.write_mask(list_value, 0, out_stream)
Expand Down
2 changes: 2 additions & 0 deletions flink-python/pyflink/fn_execution/coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def __init__(self, field_coders: List[FieldCoderImpl]):
self._mask_utils = MaskUtils(self._field_count)

def encode_to_stream(self, value, out_stream: OutputStream):
if not isinstance(value, List):
raise TypeError('Expected list, got {0}'.format(type(value)))
# encode mask value
self._mask_utils.write_mask(value, 0, out_stream)

Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ cdef class GroupAggFunctionBase:

cpdef void on_timer(self, InternalRow key):
if self.state_cleaning_enabled:
self.state_backend.set_current_key(key)
self.state_backend.set_current_key(list(key.values))
accumulator_state = self.state_backend.get_value_state(
"accumulators", self.state_value_coder)
accumulator_state.clear()
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/fn_execution/table/aggregate_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,9 @@ def process_element(self, input_data: Row):
except KeyError:
self.buffer[tuple(key)] = [input_data]

def on_timer(self, key):
def on_timer(self, key: Row):
if self.state_cleaning_enabled:
self.state_backend.set_current_key(key)
self.state_backend.set_current_key(list(key._values))
accumulator_state = self.state_backend.get_value_state(
"accumulators", self.state_value_coder)
accumulator_state.clear()
Expand Down
6 changes: 6 additions & 0 deletions flink-python/pyflink/fn_execution/tests/test_coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ def test_window_coder(self):
coder = CountWindowCoder()
self.check_coder(coder, CountWindow(100))

def test_coder_with_unmatched_type(self):
from pyflink.common import Row
coder = FlattenRowCoder([BigIntCoder()])
with self.assertRaises(TypeError, msg='Expected list, got Row'):
self.check_coder(coder, Row(1))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit 72a7031

Please sign in to comment.