diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md index 1e0527d273009..dbb18bc850c5d 100644 --- a/docs/dev/table/streaming/query_configuration.md +++ b/docs/dev/table/streaming/query_configuration.md @@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
{% highlight python %} -env = StreamExecutionEnvironment.get_execution_environment() -table_env = StreamTableEnvironment.create(env) - -# obtain query configuration from TableEnvironment -q_config = StreamQueryConfig() +# use TableConfig instead of QueryConfig in python API +t_config = TableConfig() # set query parameters -q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24)) +t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24)) + +env = StreamExecutionEnvironment.get_execution_environment() +table_env = StreamTableEnvironment.create(env, t_config) # define query result = ... @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name sink) # table sink # emit result Table via a TableSink -result.insert_into("outputTable", q_config) +result.insert_into("outputTable") {% endhighlight %}
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md index 1e0527d273009..dbb18bc850c5d 100644 --- a/docs/dev/table/streaming/query_configuration.zh.md +++ b/docs/dev/table/streaming/query_configuration.zh.md @@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
{% highlight python %} -env = StreamExecutionEnvironment.get_execution_environment() -table_env = StreamTableEnvironment.create(env) - -# obtain query configuration from TableEnvironment -q_config = StreamQueryConfig() +# use TableConfig instead of QueryConfig in python API +t_config = TableConfig() # set query parameters -q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24)) +t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24)) + +env = StreamExecutionEnvironment.get_execution_environment() +table_env = StreamTableEnvironment.create(env, t_config) # define query result = ... @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name sink) # table sink # emit result Table via a TableSink -result.insert_into("outputTable", q_config) +result.insert_into("outputTable") {% endhighlight %}
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index ac5991dc967a2..48a150ec50bd7 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -53,7 +53,6 @@ """ from __future__ import absolute_import -from pyflink.table.query_config import BatchQueryConfig, StreamQueryConfig from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \ WindowGroupedTable from pyflink.table.table_config import TableConfig @@ -74,8 +73,6 @@ 'OverWindowedTable', 'WindowGroupedTable', 'TableConfig', - 'StreamQueryConfig', - 'BatchQueryConfig', 'TableSink', 'TableSource', 'WriteMode', diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py deleted file mode 100644 index 69b6488adfe5d..0000000000000 --- a/flink-python/pyflink/table/query_config.py +++ /dev/null @@ -1,121 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from abc import ABCMeta -from datetime import timedelta -from py4j.compat import long - -from pyflink.java_gateway import get_gateway - - -class QueryConfig(object): - """ - The :class:`QueryConfig` holds parameters to configure the behavior of queries. - """ - - __metaclass__ = ABCMeta - - def __init__(self, j_query_config): - self._j_query_config = j_query_config - - -class StreamQueryConfig(QueryConfig): - """ - The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. - - Example: - :: - - >>> query_config = StreamQueryConfig() \\ - ... .with_idle_state_retention_time(datetime.timedelta(days=1), - ... datetime.timedelta(days=3)) - >>> table_env.sql_update("...", query_config) - - """ - - def __init__(self, j_stream_query_config=None): - if j_stream_query_config is not None: - self._j_stream_query_config = j_stream_query_config - else: - self._j_stream_query_config = get_gateway().jvm.StreamQueryConfig() - super(StreamQueryConfig, self).__init__(self._j_stream_query_config) - - def with_idle_state_retention_time(self, min_time, max_time): - """ - Specifies a minimum and a maximum time interval for how long idle state, i.e., state which - was not updated, will be retained. - - State will never be cleared until it was idle for less than the minimum time and will never - be kept if it was idle for more than the maximum time. - - When new data arrives for previously cleaned-up state, the new data will be handled as if it - was the first data. This can result in previous results being overwritten. - - Set to 0 (zero) to never clean-up the state. - - .. note:: - - Cleaning up state requires additional bookkeeping which becomes less expensive for - larger differences of minTime and maxTime. The difference between minTime and maxTime - must be at least 5 minutes. - - :param min_time: The minimum time interval for which idle state is retained. Set to - 0 (zero) to never clean-up the state. - :param max_time: The maximum time interval for which idle state is retained. Must be at - least 5 minutes greater than minTime. Set to - 0 (zero) to never clean-up the state. - :return: :class:`StreamQueryConfig` - """ - # type: (timedelta, timedelta) -> StreamQueryConfig - j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time - j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) - j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) - self._j_stream_query_config = \ - self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time) - return self - - def get_min_idle_state_retention_time(self): - """ - State might be cleared and removed if it was not updated for the defined period of time. - - :return: The minimum time until state which was not updated will be retained. - """ - # type: () -> int - return self._j_stream_query_config.getMinIdleStateRetentionTime() - - def get_max_idle_state_retention_time(self): - """ - State will be cleared and removed if it was not updated for the defined period of time. - - :return: The maximum time until state which was not updated will be retained. - """ - # type: () -> int - return self._j_stream_query_config.getMaxIdleStateRetentionTime() - - -class BatchQueryConfig(QueryConfig): - """ - The :class:`BatchQueryConfig` holds parameters to configure the behavior of batch queries. - """ - - def __init__(self, j_batch_query_config=None): - self._jvm = get_gateway().jvm - if j_batch_query_config is not None: - self._j_batch_query_config = j_batch_query_config - else: - self._j_batch_query_config = self._jvm.BatchQueryConfig() - super(BatchQueryConfig, self).__init__(self._j_batch_query_config) diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index d6b58647a93fe..8e0fbec93fb25 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -17,6 +17,7 @@ ################################################################################ import sys +from py4j.compat import long from pyflink.java_gateway import get_gateway __all__ = ['TableConfig'] @@ -30,11 +31,12 @@ class TableConfig(object): A config to define the runtime behavior of the Table API. """ - def __init__(self): - self._jvm = get_gateway().jvm - self._j_table_config = self._jvm.TableConfig() - self._is_stream = None # type: bool - self._parallelism = None # type: int + def __init__(self, j_table_config=None): + gateway = get_gateway() + if j_table_config is None: + self._j_table_config = gateway.jvm.TableConfig() + else: + self._j_table_config = j_table_config def get_timezone(self): """ @@ -52,7 +54,7 @@ def set_timezone(self, timezone_id): "GMT-8:00". """ if timezone_id is not None and isinstance(timezone_id, (str, unicode)): - j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id) + j_timezone = get_gateway().jvm.java.util.TimeZone.getTimeZone(timezone_id) self._j_table_config.setTimeZone(j_timezone) else: raise Exception("TableConfig.timezone should be a string!") @@ -90,3 +92,144 @@ def set_max_generated_code_length(self, max_generated_code_length): self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length) else: raise Exception("TableConfig.max_generated_code_length should be a int value!") + + def set_idle_state_retention_time(self, min_time, max_time): + """ + Specifies a minimum and a maximum time interval for how long idle state, i.e., state which + was not updated, will be retained. + + State will never be cleared until it was idle for less than the minimum time and will never + be kept if it was idle for more than the maximum time. + + When new data arrives for previously cleaned-up state, the new data will be handled as if it + was the first data. This can result in previous results being overwritten. + + Set to 0 (zero) to never clean-up the state. + + Example: + :: + + >>> table_config = TableConfig() \\ + ... .set_idle_state_retention_time(datetime.timedelta(days=1), + ... datetime.timedelta(days=3)) + + .. note:: + + Cleaning up state requires additional bookkeeping which becomes less expensive for + larger differences of minTime and maxTime. The difference between minTime and maxTime + must be at least 5 minutes. + + :param min_time: The minimum time interval for which idle state is retained. Set to + 0 (zero) to never clean-up the state. + :type min_time: datetime.timedelta + :param max_time: The maximum time interval for which idle state is retained. Must be at + least 5 minutes greater than minTime. Set to + 0 (zero) to never clean-up the state. + :type max_time: datetime.timedelta + """ + j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time + j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) + j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) + self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time) + + def get_min_idle_state_retention_time(self): + """ + State might be cleared and removed if it was not updated for the defined period of time. + + :return: The minimum time until state which was not updated will be retained. + :rtype: int + """ + return self._j_table_config.getMinIdleStateRetentionTime() + + def get_max_idle_state_retention_time(self): + """ + State will be cleared and removed if it was not updated for the defined period of time. + + :return: The maximum time until state which was not updated will be retained. + :rtype: int + """ + return self._j_table_config.getMaxIdleStateRetentionTime() + + def set_decimal_context(self, precision, rounding_mode): + """ + Sets the default context for decimal division calculation. + (precision=34, rounding_mode=HALF_EVEN) by default. + + The precision is the number of digits to be used for an operation. A value of 0 indicates + that unlimited precision (as many digits as are required) will be used. Note that leading + zeros (in the coefficient of a number) are never significant. + + The rounding mode is the rounding algorithm to be used for an operation. It could be: + + **UP**, **DOWN**, **CEILING**, **FLOOR**, **HALF_UP**, **HALF_DOWN**, **HALF_EVEN**, + **UNNECESSARY** + + The table below shows the results of rounding input to one digit with the given rounding + mode: + + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | Input | UP | DOWN | CEILING | FLOOR | HALF_UP | HALF_DOWN | HALF_EVEN | UNNECESSARY | + +=======+====+======+=========+=======+=========+===========+===========+=============+ + | 5.5 | 6 | 5 | 6 | 5 | 6 | 5 | 6 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 2.5 | 3 | 2 | 3 | 2 | 3 | 2 | 2 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 1.6 | 2 | 1 | 2 | 1 | 2 | 2 | 2 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 1.1 | 2 | 1 | 2 | 1 | 1 | 1 | 1 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 1.0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | -1.0 | -1 | -1 | -1 | -1 | -1 | -1 | -1 | -1 | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | -1.1 | -2 | -1 | -1 | -2 | -1 | -1 | -1 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | -1.6 | -2 | -1 | -1 | -2 | -2 | -2 | -2 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 2.5 | -3 | -2 | -2 | -3 | -3 | -2 | -2 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + | 5.5 | -6 | -5 | -5 | -6 | -6 | -5 | -6 | Exception | + +-------+----+------+---------+-------+---------+-----------+-----------+-------------+ + + :param precision: The precision of the decimal context. + :type precision: int + :param rounding_mode: The rounding mode of the decimal context. + :type rounding_mode: str + """ + if rounding_mode not in ( + "UP", + "DOWN", + "CEILING", + "FLOOR", + "HALF_UP", + "HALF_DOWN", + "HALF_EVEN", + "UNNECESSARY"): + raise ValueError("Unsupported rounding_mode: %s" % rounding_mode) + gateway = get_gateway() + j_rounding_mode = getattr(gateway.jvm.java.math.RoundingMode, rounding_mode) + j_math_context = gateway.jvm.java.math.MathContext(precision, j_rounding_mode) + self._j_table_config.setDecimalContext(j_math_context) + + def get_decimal_context(self): + """ + Returns current context for decimal division calculation, + (precision=34, rounding_mode=HALF_EVEN) by default. + + .. seealso:: :func:`set_decimal_context` + + :return: the current context for decimal division calculation. + :rtype: (int, str) + """ + j_math_context = self._j_table_config.getDecimalContext() + precision = j_math_context.getPrecision() + rounding_mode = j_math_context.getRoundingMode().name() + return precision, rounding_mode + + @staticmethod + def get_default(): + """ + :return: A TableConfig object with default settings. + :rtype: TableConfig + """ + return TableConfig(get_gateway().jvm.TableConfig.getDefault()) diff --git a/flink-python/pyflink/table/tests/test_table_config_completeness.py b/flink-python/pyflink/table/tests/test_table_config_completeness.py new file mode 100644 index 0000000000000..9db60cf31f722 --- /dev/null +++ b/flink-python/pyflink/table/tests/test_table_config_completeness.py @@ -0,0 +1,58 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest + +from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase +from pyflink.table import TableConfig + + +class TableConfigCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase): + """ + Tests whether the Python :class:`TableConfig` is consistent with + Java `org.apache.flink.table.api.TableConfig`. + """ + + @classmethod + def python_class(cls): + return TableConfig + + @classmethod + def java_class(cls): + return "org.apache.flink.table.api.TableConfig" + + @classmethod + def excluded_methods(cls): + # internal interfaces, no need to expose to users. + return {'getPlannerConfig', 'setPlannerConfig'} + + @classmethod + def java_method_name(cls, python_method_name): + # Most time zone related libraries in Python use 'timezone' instead of 'time_zone'. + return {'get_timezone': 'get_time_zone', + 'set_timezone': 'set_time_zone'}.get(python_method_name, python_method_name) + +if __name__ == '__main__': + import unittest + + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 3e43d5fb26188..f85d6af569a9d 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -22,7 +22,6 @@ from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import StreamQueryConfig from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment from pyflink.table.table_config import TableConfig from pyflink.table.types import DataTypes, RowType @@ -137,14 +136,31 @@ def test_sql_update(self): expected = ['1,Hi,Hello', '2,Hello,Hello'] self.assert_equals(actual, expected) - def test_query_config(self): - query_config = StreamQueryConfig() + def test_table_config(self): - query_config.with_idle_state_retention_time( + table_config = TableConfig.get_default() + table_config.set_idle_state_retention_time( datetime.timedelta(days=1), datetime.timedelta(days=2)) - self.assertEqual(2 * 24 * 3600 * 1000, query_config.get_max_idle_state_retention_time()) - self.assertEqual(24 * 3600 * 1000, query_config.get_min_idle_state_retention_time()) + self.assertEqual(2 * 24 * 3600 * 1000, table_config.get_max_idle_state_retention_time()) + self.assertEqual(24 * 3600 * 1000, table_config.get_min_idle_state_retention_time()) + + table_config.set_decimal_context(20, "UNNECESSARY") + self.assertEqual((20, "UNNECESSARY"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "HALF_EVEN") + self.assertEqual((20, "HALF_EVEN"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "HALF_DOWN") + self.assertEqual((20, "HALF_DOWN"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "HALF_UP") + self.assertEqual((20, "HALF_UP"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "FLOOR") + self.assertEqual((20, "FLOOR"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "CEILING") + self.assertEqual((20, "CEILING"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "DOWN") + self.assertEqual((20, "DOWN"), table_config.get_decimal_context()) + table_config.set_decimal_context(20, "UP") + self.assertEqual((20, "UP"), table_config.get_decimal_context()) def test_create_table_environment(self): table_config = TableConfig()