Skip to content

Commit

Permalink
[FLINK-21509][python] add 'withProcessingTime()' method call when cre…
Browse files Browse the repository at this point in the history
…ating proctime slide window assigner in 'StreamExecPythonGroupWindowAggregate' class

This closes apache#15027
  • Loading branch information
WeiZhong94 committed Mar 2, 2021
1 parent bd63e26 commit f00da55
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
30 changes: 30 additions & 0 deletions flink-python/pyflink/table/tests/test_pandas_udaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,36 @@ def test_sliding_group_window_over_time(self):
"+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0]"])
os.remove(source_path)

def test_sliding_group_window_over_proctime(self):
self.t_env.get_config().get_configuration().set_string("parallelism.default", "1")
from pyflink.table.window import Slide
self.t_env.register_function("mean_udaf", mean_udaf)

source_table = """
create table source_table(
a INT,
proctime as PROCTIME()
) with(
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.a.kind' = 'sequence',
'fields.a.start' = '1',
'fields.a.end' = '10'
)
"""
self.t_env.execute_sql(source_table)
t = self.t_env.from_path("source_table")
iterator = t.select("a, proctime") \
.window(Slide.over("1.seconds").every("1.seconds").on("proctime").alias("w")) \
.group_by("a, w") \
.select("mean_udaf(a) as b, w.start").execute().collect()
result = [i for i in iterator]
# if the WindowAssigner.isEventTime() does not return false,
# the w.start would be 1970-01-01
# TODO: After fixing the TimeZone problem of window with processing time (will be fixed in
# FLIP-162), we should replace it with a more accurate assertion.
self.assertTrue(result[0][1].year > 1970)

def test_sliding_group_window_over_count(self):
self.t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# create source file path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ private Tuple2<WindowAssigner<?>, Trigger<?>> generateWindowAssignerAndTrigger()
ValueLiteralExpression size = slidingWindow.size();
ValueLiteralExpression slide = slidingWindow.slide();
if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) {
windowAssiger = SlidingWindowAssigner.of(toDuration(size), toDuration(slide));
windowAssiger =
SlidingWindowAssigner.of(toDuration(size), toDuration(slide))
.withProcessingTime();
trigger = ProcessingTimeTriggers.afterEndOfWindow();
} else if (isRowtimeAttribute(timeField) && hasTimeIntervalType(size)) {
windowAssiger = SlidingWindowAssigner.of(toDuration(size), toDuration(slide));
Expand Down

0 comments on commit f00da55

Please sign in to comment.