Skip to content

Commit

Permalink
[FLINK-20506][python] Support FlatMap Operation in Python Table API
Browse files Browse the repository at this point in the history
This closes apache#14352.
  • Loading branch information
HuangXingBo authored and dianfu committed Dec 10, 2020
1 parent 47ee32b commit 0a6e457
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
23 changes: 23 additions & 0 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,29 @@ def map(self, func: Union[str, Expression]) -> 'Table':
else:
return Table(self._j_table.map(func._j_expr), self._t_env)

def flat_map(self, func: Union[str, Expression]) -> 'Table':
"""
Performs a flatMap operation with a user-defined table function.
Example:
::
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split(x, string):
... for s in string.split(","):
... yield x, s
>>> tab.flat_map(split(tab.a, table.b))
:param func: user-defined table function.
:return: The result table.
.. versionadded:: 1.13.0
"""
if isinstance(func, str):
return Table(self._j_table.flatMap(func), self._t_env)
else:
return Table(self._j_table.flatMap(func._j_expr), self._t_env)

def insert_into(self, table_path: str):
"""
Writes the :class:`~pyflink.table.Table` to a :class:`~pyflink.table.TableSink` that was
Expand Down
28 changes: 27 additions & 1 deletion flink-python/pyflink/table/tests/test_row_based_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from pyflink.common import Row
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from pyflink.table.udf import udf, udtf
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
PyFlinkBlinkStreamTableTestCase
Expand Down Expand Up @@ -78,6 +78,32 @@ def func(x, y):
actual = source_sink_utils.results()
self.assert_equals(actual, ["3,5", "3,7", "6,6", "9,8", "5,8"])

def test_flat_map(self):
t = self.t_env.from_elements(
[(1, "2,3", 3), (2, "1", 3), (1, "5,6,7", 4)],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.STRING()),
DataTypes.FIELD("c", DataTypes.INT())]))

table_sink = source_sink_utils.TestAppendSink(
['a', 'b'],
[DataTypes.BIGINT(), DataTypes.STRING()])
self.t_env.register_table_sink("Results", table_sink)

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x, string):
for s in string.split(","):
yield x, s

t.flat_map(split(t.a, t.b)) \
.alias("a, b") \
.flat_map(split(t.a, t.b)) \
.execute_insert("Results") \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["1,2", "1,3", "2,1", "1,5", "1,6", "1,7"])


class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBlinkBatchTableTestCase):
pass
Expand Down

0 comments on commit 0a6e457

Please sign in to comment.