Skip to content

Commit

Permalink
[hotfix][python] Align with Java Table API to remove QueryConfig (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiZhong94 authored and sunjincheng121 committed Jul 10, 2019
1 parent be794ac commit 8acc1d3
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 150 deletions.
14 changes: 7 additions & 7 deletions docs/dev/table/streaming/query_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# obtain query configuration from TableEnvironment
q_config = StreamQueryConfig()
# use TableConfig instead of QueryConfig in python API
t_config = TableConfig()
# set query parameters
q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, t_config)

# define query
result = ...
Expand All @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink

# emit result Table via a TableSink
result.insert_into("outputTable", q_config)
result.insert_into("outputTable")

{% endhighlight %}
</div>
Expand Down
14 changes: 7 additions & 7 deletions docs/dev/table/streaming/query_configuration.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

# obtain query configuration from TableEnvironment
q_config = StreamQueryConfig()
# use TableConfig instead of QueryConfig in python API
t_config = TableConfig()
# set query parameters
q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, t_config)

# define query
result = ...
Expand All @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink

# emit result Table via a TableSink
result.insert_into("outputTable", q_config)
result.insert_into("outputTable")

{% endhighlight %}
</div>
Expand Down
3 changes: 0 additions & 3 deletions flink-python/pyflink/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
"""
from __future__ import absolute_import

from pyflink.table.query_config import BatchQueryConfig, StreamQueryConfig
from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
WindowGroupedTable
from pyflink.table.table_config import TableConfig
Expand All @@ -74,8 +73,6 @@
'OverWindowedTable',
'WindowGroupedTable',
'TableConfig',
'StreamQueryConfig',
'BatchQueryConfig',
'TableSink',
'TableSource',
'WriteMode',
Expand Down
121 changes: 0 additions & 121 deletions flink-python/pyflink/table/query_config.py

This file was deleted.

155 changes: 149 additions & 6 deletions flink-python/pyflink/table/table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
################################################################################
import sys

from py4j.compat import long
from pyflink.java_gateway import get_gateway

__all__ = ['TableConfig']
Expand All @@ -30,11 +31,12 @@ class TableConfig(object):
A config to define the runtime behavior of the Table API.
"""

def __init__(self):
self._jvm = get_gateway().jvm
self._j_table_config = self._jvm.TableConfig()
self._is_stream = None # type: bool
self._parallelism = None # type: int
def __init__(self, j_table_config=None):
gateway = get_gateway()
if j_table_config is None:
self._j_table_config = gateway.jvm.TableConfig()
else:
self._j_table_config = j_table_config

def get_timezone(self):
"""
Expand All @@ -52,7 +54,7 @@ def set_timezone(self, timezone_id):
"GMT-8:00".
"""
if timezone_id is not None and isinstance(timezone_id, (str, unicode)):
j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
j_timezone = get_gateway().jvm.java.util.TimeZone.getTimeZone(timezone_id)
self._j_table_config.setTimeZone(j_timezone)
else:
raise Exception("TableConfig.timezone should be a string!")
Expand Down Expand Up @@ -90,3 +92,144 @@ def set_max_generated_code_length(self, max_generated_code_length):
self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
else:
raise Exception("TableConfig.max_generated_code_length should be a int value!")

def set_idle_state_retention_time(self, min_time, max_time):
"""
Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
was not updated, will be retained.
State will never be cleared until it was idle for less than the minimum time and will never
be kept if it was idle for more than the maximum time.
When new data arrives for previously cleaned-up state, the new data will be handled as if it
was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
::
>>> table_config = TableConfig() \\
... .set_idle_state_retention_time(datetime.timedelta(days=1),
... datetime.timedelta(days=3))
.. note::
Cleaning up state requires additional bookkeeping which becomes less expensive for
larger differences of minTime and maxTime. The difference between minTime and maxTime
must be at least 5 minutes.
:param min_time: The minimum time interval for which idle state is retained. Set to
0 (zero) to never clean-up the state.
:type min_time: datetime.timedelta
:param max_time: The maximum time interval for which idle state is retained. Must be at
least 5 minutes greater than minTime. Set to
0 (zero) to never clean-up the state.
:type max_time: datetime.timedelta
"""
j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time)

def get_min_idle_state_retention_time(self):
"""
State might be cleared and removed if it was not updated for the defined period of time.
:return: The minimum time until state which was not updated will be retained.
:rtype: int
"""
return self._j_table_config.getMinIdleStateRetentionTime()

def get_max_idle_state_retention_time(self):
"""
State will be cleared and removed if it was not updated for the defined period of time.
:return: The maximum time until state which was not updated will be retained.
:rtype: int
"""
return self._j_table_config.getMaxIdleStateRetentionTime()

def set_decimal_context(self, precision, rounding_mode):
"""
Sets the default context for decimal division calculation.
(precision=34, rounding_mode=HALF_EVEN) by default.
The precision is the number of digits to be used for an operation. A value of 0 indicates
that unlimited precision (as many digits as are required) will be used. Note that leading
zeros (in the coefficient of a number) are never significant.
The rounding mode is the rounding algorithm to be used for an operation. It could be:
**UP**, **DOWN**, **CEILING**, **FLOOR**, **HALF_UP**, **HALF_DOWN**, **HALF_EVEN**,
**UNNECESSARY**
The table below shows the results of rounding input to one digit with the given rounding
mode:
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| Input | UP | DOWN | CEILING | FLOOR | HALF_UP | HALF_DOWN | HALF_EVEN | UNNECESSARY |
+=======+====+======+=========+=======+=========+===========+===========+=============+
| 5.5 | 6 | 5 | 6 | 5 | 6 | 5 | 6 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 2.5 | 3 | 2 | 3 | 2 | 3 | 2 | 2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.6 | 2 | 1 | 2 | 1 | 2 | 2 | 2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.1 | 2 | 1 | 2 | 1 | 1 | 1 | 1 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 1.0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.0 | -1 | -1 | -1 | -1 | -1 | -1 | -1 | -1 |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.1 | -2 | -1 | -1 | -2 | -1 | -1 | -1 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| -1.6 | -2 | -1 | -1 | -2 | -2 | -2 | -2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 2.5 | -3 | -2 | -2 | -3 | -3 | -2 | -2 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
| 5.5 | -6 | -5 | -5 | -6 | -6 | -5 | -6 | Exception |
+-------+----+------+---------+-------+---------+-----------+-----------+-------------+
:param precision: The precision of the decimal context.
:type precision: int
:param rounding_mode: The rounding mode of the decimal context.
:type rounding_mode: str
"""
if rounding_mode not in (
"UP",
"DOWN",
"CEILING",
"FLOOR",
"HALF_UP",
"HALF_DOWN",
"HALF_EVEN",
"UNNECESSARY"):
raise ValueError("Unsupported rounding_mode: %s" % rounding_mode)
gateway = get_gateway()
j_rounding_mode = getattr(gateway.jvm.java.math.RoundingMode, rounding_mode)
j_math_context = gateway.jvm.java.math.MathContext(precision, j_rounding_mode)
self._j_table_config.setDecimalContext(j_math_context)

def get_decimal_context(self):
"""
Returns current context for decimal division calculation,
(precision=34, rounding_mode=HALF_EVEN) by default.
.. seealso:: :func:`set_decimal_context`
:return: the current context for decimal division calculation.
:rtype: (int, str)
"""
j_math_context = self._j_table_config.getDecimalContext()
precision = j_math_context.getPrecision()
rounding_mode = j_math_context.getRoundingMode().name()
return precision, rounding_mode

@staticmethod
def get_default():
"""
:return: A TableConfig object with default settings.
:rtype: TableConfig
"""
return TableConfig(get_gateway().jvm.TableConfig.getDefault())
Loading

0 comments on commit 8acc1d3

Please sign in to comment.