Skip to content

Commit

Permalink
[FLINK-17902][python] Support the new interfaces about temporary func…
Browse files Browse the repository at this point in the history
…tions in PyFlink

This closes apache#12476.
  • Loading branch information
SteNicholas authored and dianfu committed Jun 9, 2020
1 parent fd9214e commit 74231f7
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
235 changes: 235 additions & 0 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from pyflink.table.table_result import TableResult
from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
from pyflink.table.udf import UserDefinedFunctionWrapper
from pyflink.util import utils
from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \
to_j_explain_detail_arr
Expand Down Expand Up @@ -171,6 +172,232 @@ def unload_module(self, module_name: str):
"""
self._j_tenv.unloadModule(module_name)

def create_java_temporary_system_function(self, name: str, function_class_name: str):
"""
Registers a java user defined function class as a temporary system function.
Compared to .. seealso:: :func:`create_java_temporary_function`, system functions are
identified by a global name that is independent of the current catalog and current
database. Thus, this method allows to extend the set of built-in system functions like
TRIM, ABS, etc.
Temporary functions can shadow permanent ones. If a permanent function under a given name
exists, it will be inaccessible in the current session. To make the permanent function
available again one can drop the corresponding temporary system function.
Example:
::
>>> table_env.create_java_temporary_system_function("func",
... "java.user.defined.function.class.name")
:param name: The name under which the function will be registered globally.
:param function_class_name: The java full qualified class name of the function class
containing the implementation. The function must have a
public no-argument constructor and can be founded in current
Java classloader.
.. versionadded:: 1.12.0
"""
gateway = get_gateway()
java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \
.loadClass(function_class_name)
self._j_tenv.createTemporarySystemFunction(name, java_function)

def create_temporary_system_function(self, name: str,
function: UserDefinedFunctionWrapper):
"""
Registers a python user defined function class as a temporary system function.
Compared to .. seealso:: :func:`create_temporary_function`, system functions are identified
by a global name that is independent of the current catalog and current database. Thus,
this method allows to extend the set of built-in system functions like TRIM, ABS, etc.
Temporary functions can shadow permanent ones. If a permanent function under a given name
exists, it will be inaccessible in the current session. To make the permanent function
available again one can drop the corresponding temporary system function.
Example:
::
>>> table_env.create_temporary_system_function(
... "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))
>>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
... result_type=DataTypes.BIGINT())
... def add(i, j):
... return i + j
>>> table_env.create_temporary_system_function("add", add)
>>> class SubtractOne(ScalarFunction):
... def eval(self, i):
... return i - 1
>>> table_env.create_temporary_system_function(
... "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()))
:param name: The name under which the function will be registered globally.
:param function: The function class containing the implementation. The function must have a
public no-argument constructor and can be founded in current Java
classloader.
.. versionadded:: 1.12.0
"""
java_function = function.java_user_defined_function()
self._j_tenv.createTemporarySystemFunction(name, java_function)

def drop_temporary_system_function(self, name: str) -> bool:
"""
Drops a temporary system function registered under the given name.
If a permanent function with the given name exists, it will be used from now on for any
queries that reference this name.
:param name: The name under which the function has been registered globally.
:return: true if a function existed under the given name and was removed.
.. versionadded:: 1.12.0
"""
return self._j_tenv.dropTemporarySystemFunction(name)

def create_java_function(self, path: str, function_class_name: str,
ignore_if_exists: bool = None):
"""
Registers a java user defined function class as a catalog function in the given path.
Compared to system functions with a globally defined name, catalog functions are always
(implicitly or explicitly) identified by a catalog and database.
There must not be another function (temporary or permanent) registered under the same path.
Example:
::
>>> table_env.create_java_function("func", "java.user.defined.function.class.name")
:param path: The path under which the function will be registered.
See also the :class:`~pyflink.table.TableEnvironment` class description for
the format of the path.
:param function_class_name: The java full qualified class name of the function class
containing the implementation. The function must have a
public no-argument constructor and can be founded in current
Java classloader.
:param ignore_if_exists: If a function exists under the given path and this flag is set,
no operation is executed. An exception is thrown otherwise.
.. versionadded:: 1.12.0
"""
gateway = get_gateway()
java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \
.loadClass(function_class_name)
if ignore_if_exists is None:
self._j_tenv.createFunction(path, java_function)
else:
self._j_tenv.createFunction(path, java_function, ignore_if_exists)

def drop_function(self, path) -> bool:
"""
Drops a catalog function registered in the given path.
:param path: The path under which the function will be registered.
See also the :class:`~pyflink.table.TableEnvironment` class description for
the format of the path.
:return: true if a function existed in the given path and was removed.
.. versionadded:: 1.12.0
"""
return self._j_tenv.dropFunction(path)

def create_java_temporary_function(self, path: str, function_class_name: str):
"""
Registers a java user defined function class as a temporary catalog function.
Compared to .. seealso:: :func:`create_java_temporary_system_function` with a globally
defined name, catalog functions are always (implicitly or explicitly) identified by a
catalog and database.
Temporary functions can shadow permanent ones. If a permanent function under a given name
exists, it will be inaccessible in the current session. To make the permanent function
available again one can drop the corresponding temporary function.
Example:
::
>>> table_env.create_java_temporary_function("func",
... "java.user.defined.function.class.name")
:param path: The path under which the function will be registered.
See also the :class:`~pyflink.table.TableEnvironment` class description for
the format of the path.
:param function_class_name: The java full qualified class name of the function class
containing the implementation. The function must have a
public no-argument constructor and can be founded in current
Java classloader.
.. versionadded:: 1.12.0
"""
gateway = get_gateway()
java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \
.loadClass(function_class_name)
self._j_tenv.createTemporaryFunction(path, java_function)

def create_temporary_function(self, path: str, function: UserDefinedFunctionWrapper):
"""
Registers a python user defined function class as a temporary catalog function.
Compared to .. seealso:: :func:`create_temporary_system_function` with a globally defined
name, catalog functions are always (implicitly or explicitly) identified by a catalog and
database.
Temporary functions can shadow permanent ones. If a permanent function under a given name
exists, it will be inaccessible in the current session. To make the permanent function
available again one can drop the corresponding temporary function.
Example:
::
>>> table_env.create_temporary_function(
... "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))
>>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
... result_type=DataTypes.BIGINT())
... def add(i, j):
... return i + j
>>> table_env.create_temporary_function("add", add)
>>> class SubtractOne(ScalarFunction):
... def eval(self, i):
... return i - 1
>>> table_env.create_temporary_function(
... "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()))
:param path: The path under which the function will be registered.
See also the :class:`~pyflink.table.TableEnvironment` class description for
the format of the path.
:param function: The function class containing the implementation. The function must have a
public no-argument constructor and can be founded in current Java
classloader.
.. versionadded:: 1.12.0
"""
java_function = function.java_user_defined_function()
self._j_tenv.createTemporaryFunction(path, java_function)

def drop_temporary_function(self, path) -> bool:
"""
Drops a temporary system function registered under the given name.
If a permanent function with the given name exists, it will be used from now on for any
queries that reference this name.
:param path: The path under which the function will be registered.
See also the :class:`~pyflink.table.TableEnvironment` class description for
the format of the path.
:return: true if a function existed in the given path and was removed.
.. versionadded:: 1.12.0
"""
return self._j_tenv.dropTemporaryFunction(path)

def register_table(self, name, table):
"""
Registers a :class:`~pyflink.table.Table` under a unique name in the TableEnvironment's
Expand Down Expand Up @@ -841,7 +1068,11 @@ def register_java_function(self, name, function_class_name):
The function must have a public no-argument constructor and can
be founded in current Java classloader.
:type function_class_name: str
.. note:: Deprecated in 1.12. Use :func:`create_java_temporary_system_function` instead.
"""
warnings.warn("Deprecated in 1.12. Use :func:`create_java_temporary_system_function` "
"instead.", DeprecationWarning)
gateway = get_gateway()
java_function = gateway.jvm.Thread.currentThread().getContextClassLoader()\
.loadClass(function_class_name).newInstance()
Expand Down Expand Up @@ -886,7 +1117,11 @@ def register_function(self, name, function):
:type function: pyflink.table.udf.UserDefinedFunctionWrapper
.. versionadded:: 1.10.0
.. note:: Deprecated in 1.12. Use :func:`create_temporary_system_function` instead.
"""
warnings.warn("Deprecated in 1.12. Use :func:`create_temporary_system_function` "
"instead.", DeprecationWarning)
java_function = function.java_user_defined_function()
# this is a temporary solution and will be unified later when we use the new type
# system(DataType) to replace the old type system(TypeInformation).
Expand Down
34 changes: 34 additions & 0 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ def test_unload_and_load_module(self):
self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT)
self.assert_equals(table_result.get_table_schema().get_field_names(), ['test_module'])

def test_create_and_drop_java_function(self):
t_env = self.t_env

t_env.create_java_temporary_system_function(
"scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0")
t_env.create_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
t_env.create_java_temporary_function(
"table_func", "org.apache.flink.table.utils.TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])

t_env.drop_temporary_system_function("scalar_func")
t_env.drop_function("agg_func")
t_env.drop_temporary_function("table_func")
self.assert_equals(t_env.list_user_defined_functions(), [])


class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCase):

Expand Down Expand Up @@ -683,3 +700,20 @@ def test_unload_and_load_module(self):
table_result = t_env.execute_sql("select concat('unload', 'load') as test_module")
self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT)
self.assert_equals(table_result.get_table_schema().get_field_names(), ['test_module'])

def test_create_and_drop_java_function(self):
t_env = self.t_env

t_env.create_java_temporary_system_function(
"scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0")
t_env.create_java_function(
"agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
t_env.create_java_temporary_function(
"table_func", "org.apache.flink.table.utils.TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])

t_env.drop_temporary_system_function("scalar_func")
t_env.drop_function("agg_func")
t_env.drop_temporary_function("table_func")
self.assert_equals(t_env.list_user_defined_functions(), [])
14 changes: 14 additions & 0 deletions flink-python/pyflink/table/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,20 @@ def decimal_cut_func(decimal_param):
"{1=flink, 2=pyflink},1000000000000000000.050000000000000000,"
"1000000000000000000.059999999999999999"])

def test_create_and_drop_function(self):
t_env = self.t_env

t_env.create_temporary_system_function(
"add_one_func", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))
t_env.create_temporary_function(
"subtract_one_func", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()))
self.assert_equals(t_env.list_user_defined_functions(),
['add_one_func', 'subtract_one_func'])

t_env.drop_temporary_system_function("add_one_func")
t_env.drop_temporary_function("subtract_one_func")
self.assert_equals(t_env.list_user_defined_functions(), [])


# decide whether two floats are equal
def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
Expand Down

0 comments on commit 74231f7

Please sign in to comment.