From 74231f736cdb7abb06887ab93940519bda87c169 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 4 Jun 2020 12:01:52 +0800 Subject: [PATCH] [FLINK-17902][python] Support the new interfaces about temporary functions in PyFlink This closes #12476. --- .../pyflink/table/table_environment.py | 235 ++++++++++++++++++ .../table/tests/test_table_environment_api.py | 34 +++ flink-python/pyflink/table/tests/test_udf.py | 14 ++ 3 files changed, 283 insertions(+) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index e63dd01e7063a..940ee7ae14c6a 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -37,6 +37,7 @@ from pyflink.table.table_result import TableResult from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \ _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema +from pyflink.table.udf import UserDefinedFunctionWrapper from pyflink.util import utils from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \ to_j_explain_detail_arr @@ -171,6 +172,232 @@ def unload_module(self, module_name: str): """ self._j_tenv.unloadModule(module_name) + def create_java_temporary_system_function(self, name: str, function_class_name: str): + """ + Registers a java user defined function class as a temporary system function. + + Compared to .. seealso:: :func:`create_java_temporary_function`, system functions are + identified by a global name that is independent of the current catalog and current + database. Thus, this method allows to extend the set of built-in system functions like + TRIM, ABS, etc. + + Temporary functions can shadow permanent ones. If a permanent function under a given name + exists, it will be inaccessible in the current session. To make the permanent function + available again one can drop the corresponding temporary system function. + + Example: + :: + + >>> table_env.create_java_temporary_system_function("func", + ... "java.user.defined.function.class.name") + + :param name: The name under which the function will be registered globally. + :param function_class_name: The java full qualified class name of the function class + containing the implementation. The function must have a + public no-argument constructor and can be founded in current + Java classloader. + + .. versionadded:: 1.12.0 + """ + gateway = get_gateway() + java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \ + .loadClass(function_class_name) + self._j_tenv.createTemporarySystemFunction(name, java_function) + + def create_temporary_system_function(self, name: str, + function: UserDefinedFunctionWrapper): + """ + Registers a python user defined function class as a temporary system function. + + Compared to .. seealso:: :func:`create_temporary_function`, system functions are identified + by a global name that is independent of the current catalog and current database. Thus, + this method allows to extend the set of built-in system functions like TRIM, ABS, etc. + + Temporary functions can shadow permanent ones. If a permanent function under a given name + exists, it will be inaccessible in the current session. To make the permanent function + available again one can drop the corresponding temporary system function. + + Example: + :: + + >>> table_env.create_temporary_system_function( + ... "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) + + >>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], + ... result_type=DataTypes.BIGINT()) + ... def add(i, j): + ... return i + j + >>> table_env.create_temporary_system_function("add", add) + + >>> class SubtractOne(ScalarFunction): + ... def eval(self, i): + ... return i - 1 + >>> table_env.create_temporary_system_function( + ... "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())) + + :param name: The name under which the function will be registered globally. + :param function: The function class containing the implementation. The function must have a + public no-argument constructor and can be founded in current Java + classloader. + + .. versionadded:: 1.12.0 + """ + java_function = function.java_user_defined_function() + self._j_tenv.createTemporarySystemFunction(name, java_function) + + def drop_temporary_system_function(self, name: str) -> bool: + """ + Drops a temporary system function registered under the given name. + + If a permanent function with the given name exists, it will be used from now on for any + queries that reference this name. + + :param name: The name under which the function has been registered globally. + :return: true if a function existed under the given name and was removed. + + .. versionadded:: 1.12.0 + """ + return self._j_tenv.dropTemporarySystemFunction(name) + + def create_java_function(self, path: str, function_class_name: str, + ignore_if_exists: bool = None): + """ + Registers a java user defined function class as a catalog function in the given path. + + Compared to system functions with a globally defined name, catalog functions are always + (implicitly or explicitly) identified by a catalog and database. + + There must not be another function (temporary or permanent) registered under the same path. + + Example: + :: + + >>> table_env.create_java_function("func", "java.user.defined.function.class.name") + + :param path: The path under which the function will be registered. + See also the :class:`~pyflink.table.TableEnvironment` class description for + the format of the path. + :param function_class_name: The java full qualified class name of the function class + containing the implementation. The function must have a + public no-argument constructor and can be founded in current + Java classloader. + :param ignore_if_exists: If a function exists under the given path and this flag is set, + no operation is executed. An exception is thrown otherwise. + + .. versionadded:: 1.12.0 + """ + gateway = get_gateway() + java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \ + .loadClass(function_class_name) + if ignore_if_exists is None: + self._j_tenv.createFunction(path, java_function) + else: + self._j_tenv.createFunction(path, java_function, ignore_if_exists) + + def drop_function(self, path) -> bool: + """ + Drops a catalog function registered in the given path. + + :param path: The path under which the function will be registered. + See also the :class:`~pyflink.table.TableEnvironment` class description for + the format of the path. + :return: true if a function existed in the given path and was removed. + + .. versionadded:: 1.12.0 + """ + return self._j_tenv.dropFunction(path) + + def create_java_temporary_function(self, path: str, function_class_name: str): + """ + Registers a java user defined function class as a temporary catalog function. + + Compared to .. seealso:: :func:`create_java_temporary_system_function` with a globally + defined name, catalog functions are always (implicitly or explicitly) identified by a + catalog and database. + + Temporary functions can shadow permanent ones. If a permanent function under a given name + exists, it will be inaccessible in the current session. To make the permanent function + available again one can drop the corresponding temporary function. + + Example: + :: + + >>> table_env.create_java_temporary_function("func", + ... "java.user.defined.function.class.name") + + :param path: The path under which the function will be registered. + See also the :class:`~pyflink.table.TableEnvironment` class description for + the format of the path. + :param function_class_name: The java full qualified class name of the function class + containing the implementation. The function must have a + public no-argument constructor and can be founded in current + Java classloader. + + .. versionadded:: 1.12.0 + """ + gateway = get_gateway() + java_function = gateway.jvm.Thread.currentThread().getContextClassLoader() \ + .loadClass(function_class_name) + self._j_tenv.createTemporaryFunction(path, java_function) + + def create_temporary_function(self, path: str, function: UserDefinedFunctionWrapper): + """ + Registers a python user defined function class as a temporary catalog function. + + Compared to .. seealso:: :func:`create_temporary_system_function` with a globally defined + name, catalog functions are always (implicitly or explicitly) identified by a catalog and + database. + + Temporary functions can shadow permanent ones. If a permanent function under a given name + exists, it will be inaccessible in the current session. To make the permanent function + available again one can drop the corresponding temporary function. + + Example: + :: + + >>> table_env.create_temporary_function( + ... "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) + + >>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], + ... result_type=DataTypes.BIGINT()) + ... def add(i, j): + ... return i + j + >>> table_env.create_temporary_function("add", add) + + >>> class SubtractOne(ScalarFunction): + ... def eval(self, i): + ... return i - 1 + >>> table_env.create_temporary_function( + ... "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())) + + :param path: The path under which the function will be registered. + See also the :class:`~pyflink.table.TableEnvironment` class description for + the format of the path. + :param function: The function class containing the implementation. The function must have a + public no-argument constructor and can be founded in current Java + classloader. + + .. versionadded:: 1.12.0 + """ + java_function = function.java_user_defined_function() + self._j_tenv.createTemporaryFunction(path, java_function) + + def drop_temporary_function(self, path) -> bool: + """ + Drops a temporary system function registered under the given name. + + If a permanent function with the given name exists, it will be used from now on for any + queries that reference this name. + + :param path: The path under which the function will be registered. + See also the :class:`~pyflink.table.TableEnvironment` class description for + the format of the path. + :return: true if a function existed in the given path and was removed. + + .. versionadded:: 1.12.0 + """ + return self._j_tenv.dropTemporaryFunction(path) + def register_table(self, name, table): """ Registers a :class:`~pyflink.table.Table` under a unique name in the TableEnvironment's @@ -841,7 +1068,11 @@ def register_java_function(self, name, function_class_name): The function must have a public no-argument constructor and can be founded in current Java classloader. :type function_class_name: str + + .. note:: Deprecated in 1.12. Use :func:`create_java_temporary_system_function` instead. """ + warnings.warn("Deprecated in 1.12. Use :func:`create_java_temporary_system_function` " + "instead.", DeprecationWarning) gateway = get_gateway() java_function = gateway.jvm.Thread.currentThread().getContextClassLoader()\ .loadClass(function_class_name).newInstance() @@ -886,7 +1117,11 @@ def register_function(self, name, function): :type function: pyflink.table.udf.UserDefinedFunctionWrapper .. versionadded:: 1.10.0 + + .. note:: Deprecated in 1.12. Use :func:`create_temporary_system_function` instead. """ + warnings.warn("Deprecated in 1.12. Use :func:`create_temporary_system_function` " + "instead.", DeprecationWarning) java_function = function.java_user_defined_function() # this is a temporary solution and will be unified later when we use the new type # system(DataType) to replace the old type system(TypeInformation). diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 9cd283002bb04..2bf40c8170144 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -95,6 +95,23 @@ def test_unload_and_load_module(self): self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT) self.assert_equals(table_result.get_table_schema().get_field_names(), ['test_module']) + def test_create_and_drop_java_function(self): + t_env = self.t_env + + t_env.create_java_temporary_system_function( + "scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0") + t_env.create_java_function( + "agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction") + t_env.create_java_temporary_function( + "table_func", "org.apache.flink.table.utils.TableFunc1") + self.assert_equals(t_env.list_user_defined_functions(), + ['scalar_func', 'agg_func', 'table_func']) + + t_env.drop_temporary_system_function("scalar_func") + t_env.drop_function("agg_func") + t_env.drop_temporary_function("table_func") + self.assert_equals(t_env.list_user_defined_functions(), []) + class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCase): @@ -683,3 +700,20 @@ def test_unload_and_load_module(self): table_result = t_env.execute_sql("select concat('unload', 'load') as test_module") self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT) self.assert_equals(table_result.get_table_schema().get_field_names(), ['test_module']) + + def test_create_and_drop_java_function(self): + t_env = self.t_env + + t_env.create_java_temporary_system_function( + "scalar_func", "org.apache.flink.table.expressions.utils.RichFunc0") + t_env.create_java_function( + "agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction") + t_env.create_java_temporary_function( + "table_func", "org.apache.flink.table.utils.TableFunc1") + self.assert_equals(t_env.list_user_defined_functions(), + ['scalar_func', 'agg_func', 'table_func']) + + t_env.drop_temporary_system_function("scalar_func") + t_env.drop_function("agg_func") + t_env.drop_temporary_function("table_func") + self.assert_equals(t_env.list_user_defined_functions(), []) diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py index bc9860eb01dab..fbe0bd8716076 100644 --- a/flink-python/pyflink/table/tests/test_udf.py +++ b/flink-python/pyflink/table/tests/test_udf.py @@ -476,6 +476,20 @@ def decimal_cut_func(decimal_param): "{1=flink, 2=pyflink},1000000000000000000.050000000000000000," "1000000000000000000.059999999999999999"]) + def test_create_and_drop_function(self): + t_env = self.t_env + + t_env.create_temporary_system_function( + "add_one_func", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) + t_env.create_temporary_function( + "subtract_one_func", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())) + self.assert_equals(t_env.list_user_defined_functions(), + ['add_one_func', 'subtract_one_func']) + + t_env.drop_temporary_system_function("add_one_func") + t_env.drop_temporary_function("subtract_one_func") + self.assert_equals(t_env.list_user_defined_functions(), []) + # decide whether two floats are equal def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):