Skip to content

Commit

Permalink
[FLINK-20525][python] Fix StreamArrowPythonGroupWindowAggregateFuncti…
Browse files Browse the repository at this point in the history
…onOperator incorrect handling of rowtime and proctime fields

This closes apache#14327.
  • Loading branch information
HuangXingBo authored and dianfu committed Dec 9, 2020
1 parent 6727956 commit 5376254
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
16 changes: 9 additions & 7 deletions flink-python/pyflink/table/tests/test_pandas_udaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,24 +442,26 @@ def test_tumbling_group_window_over_time(self):
t = self.t_env.from_path("source_table")

table_sink = source_sink_utils.TestAppendSink(
['a', 'b', 'c', 'd'],
['a', 'b', 'c', 'd', 'e'],
[
DataTypes.TINYINT(),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(3),
DataTypes.FLOAT()])
self.t_env.register_table_sink("Results", table_sink)
t.window(Tumble.over("1.hours").on("rowtime").alias("w")) \
.group_by("a, b, w") \
.select("a, w.start, w.end, mean_udaf(c) as b") \
.select("a, w.start, w.end, w.rowtime, mean_udaf(c) as b") \
.execute_insert("Results") \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.5",
"1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,8.0",
"2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0",
"3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0"])
self.assert_equals(actual, [
"1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.5",
"1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,2018-03-11 04:59:59.999,8.0",
"2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0",
"3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0",
])
os.remove(source_path)

def test_tumbling_group_window_over_count(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ private void setWindowProperty(W currentWindow) {
windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) currentWindow).getEnd()));
break;
case 2:
windowProperty.setField(i, TimestampData.fromEpochMillis(currentWindow.maxTimestamp()));
windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) currentWindow).getEnd() - 1));
break;
case 3:
windowProperty.setField(i, TimestampData.fromEpochMillis(-1));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.table.functions.python.PythonFunctionInfo
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart}
import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart}
import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonGroupWindowAggregate.ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
Expand Down Expand Up @@ -219,6 +219,7 @@ class StreamExecPythonGroupWindowAggregate(
case PlannerWindowStart(_) => 0
case PlannerWindowEnd(_) => 1
case PlannerRowtimeAttribute(_) => 2
case PlannerProctimeAttribute(_) => 3
}
}.toArray

Expand Down

0 comments on commit 5376254

Please sign in to comment.