From 5376254c49555c7211f17ab5c4cfda6cb0ed7971 Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Tue, 8 Dec 2020 12:53:13 +0800 Subject: [PATCH] [FLINK-20525][python] Fix StreamArrowPythonGroupWindowAggregateFunctionOperator incorrect handling of rowtime and proctime fields This closes #14327. --- .../pyflink/table/tests/test_pandas_udaf.py | 16 +++++++++------- ...thonGroupWindowAggregateFunctionOperator.java | 4 +++- .../StreamExecPythonGroupWindowAggregate.scala | 3 ++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py index b104325a9a830..e360f629a01bb 100644 --- a/flink-python/pyflink/table/tests/test_pandas_udaf.py +++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py @@ -442,24 +442,26 @@ def test_tumbling_group_window_over_time(self): t = self.t_env.from_path("source_table") table_sink = source_sink_utils.TestAppendSink( - ['a', 'b', 'c', 'd'], + ['a', 'b', 'c', 'd', 'e'], [ DataTypes.TINYINT(), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(3), DataTypes.FLOAT()]) self.t_env.register_table_sink("Results", table_sink) t.window(Tumble.over("1.hours").on("rowtime").alias("w")) \ .group_by("a, b, w") \ - .select("a, w.start, w.end, mean_udaf(c) as b") \ + .select("a, w.start, w.end, w.rowtime, mean_udaf(c) as b") \ .execute_insert("Results") \ .wait() actual = source_sink_utils.results() - self.assert_equals(actual, - ["1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.5", - "1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,8.0", - "2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0", - "3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0"]) + self.assert_equals(actual, [ + "1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.5", + "1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,2018-03-11 04:59:59.999,8.0", + "2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0", + "3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0", + ]) os.remove(source_path) def test_tumbling_group_window_over_count(self): diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java index 6c458c102aef4..29a77a6624b43 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.java @@ -380,8 +380,10 @@ private void setWindowProperty(W currentWindow) { windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) currentWindow).getEnd())); break; case 2: - windowProperty.setField(i, TimestampData.fromEpochMillis(currentWindow.maxTimestamp())); + windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) currentWindow).getEnd() - 1)); break; + case 3: + windowProperty.setField(i, TimestampData.fromEpochMillis(-1)); } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupWindowAggregate.scala index 7de00c984de33..54479ba2d4c8d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupWindowAggregate.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.functions.python.PythonFunctionInfo import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.delegation.StreamPlanner -import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} +import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonGroupWindowAggregate.ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME @@ -219,6 +219,7 @@ class StreamExecPythonGroupWindowAggregate( case PlannerWindowStart(_) => 0 case PlannerWindowEnd(_) => 1 case PlannerRowtimeAttribute(_) => 2 + case PlannerProctimeAttribute(_) => 3 } }.toArray