Skip to content

Commit

Permalink
[FLINK-29461][python] Make the test_process_function more stable
Browse files Browse the repository at this point in the history
This closes apache#21491.
  • Loading branch information
HuangXingBo committed Dec 13, 2022
1 parent ba6c0c7 commit 4df6a39
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,24 +980,23 @@ class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
yield "current timestamp: {}, current watermark: {}, current_value: {}"\
.format(str(current_timestamp), str(current_watermark), str(value))
yield "current timestamp: {}, current_value: {}"\
.format(str(current_timestamp), str(value))

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
.process(MyProcessFunction(), output_type=Types.STRING()).add_sink(self.test_sink)
self.env.execute('test process function')
results = self.test_sink.get_results()
expected = ["current timestamp: 1603708211000, current watermark: "
"-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
"current timestamp: 1603708224000, current watermark: "
"-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
"current timestamp: 1603708226000, current watermark: "
"-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
"current timestamp: 1603708289000, current watermark: "
"-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
expected = ["current timestamp: 1603708211000, "
"current_value: Row(f0=1, f1='1603708211000')",
"current timestamp: 1603708224000, "
"current_value: Row(f0=2, f1='1603708224000')",
"current timestamp: 1603708226000, "
"current_value: Row(f0=3, f1='1603708226000')",
"current timestamp: 1603708289000, "
"current_value: Row(f0=4, f1='1603708289000')"]
self.assert_equals_sorted(expected, results)

def test_process_side_output(self):
Expand Down

0 comments on commit 4df6a39

Please sign in to comment.