Skip to content

Commit

Permalink
[FLINK-18200][python] Replace the deprecated interfaces with the new …
Browse files Browse the repository at this point in the history
…interfaces in the tests and examples

This closes apache#12770.
  • Loading branch information
dianfu committed Aug 19, 2020
1 parent 183f4c1 commit b6592fc
Show file tree
Hide file tree
Showing 22 changed files with 235 additions and 311 deletions.
4 changes: 1 addition & 3 deletions flink-python/dev/pip_test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
.field("c", DataTypes.STRING())) \
.create_temporary_table("batch_sink")

t.select("a + 1, b, c").insert_into("batch_sink")

bt_env.execute("test")
t.select("a + 1, b, c").execute_insert("batch_sink").get_job_client().get_job_execution_result().result()

with open(sink_path, 'r') as f:
lines = f.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyflink.common import ExecutionConfig, RestartStrategies
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import DataTypes, BatchTableEnvironment, CsvTableSource, CsvTableSink
from pyflink.testing.test_case_utils import PyFlinkTestCase
from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table


class ExecutionEnvironmentTests(PyFlinkTestCase):
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_get_execution_plan(self):
t_env.register_table_sink(
"Results",
CsvTableSink(field_names, field_types, tmp_csv))
t_env.scan("Orders").insert_into("Results")
exec_insert_table(t_env.from_path("Orders"), "Results")

plan = self.env.get_execution_plan()

Expand All @@ -127,8 +127,9 @@ def test_execute(self):
'Results',
CsvTableSink(field_names, field_types,
os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
t_env.insert_into('Results', t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
execution_result = t_env.execute('test_batch_execute')
execution_result = exec_insert_table(
t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']),
'Results')
self.assertIsNotNone(execution_result.get_job_id())
self.assertIsNotNone(execution_result.get_net_runtime())
self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyflink.java_gateway import get_gateway
from pyflink.pyflink_gateway_server import on_windows
from pyflink.table import DataTypes, CsvTableSource, CsvTableSink, StreamTableEnvironment
from pyflink.testing.test_case_utils import PyFlinkTestCase
from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table


class StreamExecutionEnvironmentTests(PyFlinkTestCase):
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_get_execution_plan(self):
t_env.register_table_sink(
"Results",
CsvTableSink(field_names, field_types, tmp_csv))
t_env.scan("Orders").insert_into("Results")
exec_insert_table(t_env.from_path("Orders"), "Results")

plan = self.env.get_execution_plan()

Expand All @@ -212,8 +212,9 @@ def test_execute(self):
'Results',
CsvTableSink(field_names, field_types,
os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
t_env.insert_into('Results', t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
execution_result = t_env.execute('test_stream_execute')
execution_result = exec_insert_table(
t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']),
'Results')
self.assertIsNotNone(execution_result.get_job_id())
self.assertIsNotNone(execution_result.get_net_runtime())
self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
Expand Down
35 changes: 10 additions & 25 deletions flink-python/pyflink/ml/tests/test_pipeline_it_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
# limitations under the License.
################################################################################

from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import MLTestCase

from pyflink import keyword
from pyflink.java_gateway import get_gateway
from pyflink.ml.api import JavaTransformer, Transformer, Estimator, Model, \
MLEnvironmentFactory, Pipeline
from pyflink.ml.api.param import WithParams, ParamInfo, TypeConverters
from pyflink.ml.lib.param.colname import HasSelectedCols,\
from pyflink.ml.lib.param.colname import HasSelectedCols, \
HasPredictionCol, HasOutputCol
from pyflink import keyword
from pyflink.table.types import DataTypes
from pyflink.testing import source_sink_utils
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import MLTestCase, exec_insert_table


class HasVectorCol(WithParams):
Expand Down Expand Up @@ -103,8 +102,7 @@ def load_model(self, table_env):
"""
table_sink = source_sink_utils.TestRetractSink(["max_sum"], [DataTypes.BIGINT()])
table_env.register_table_sink("Model_Results", table_sink)
self._model_data_table.insert_into("Model_Results")
table_env.execute("load model")
exec_insert_table(self._model_data_table, "Model_Results")
actual = source_sink_utils.results()
self.max_sum = actual.apply(0)

Expand All @@ -128,12 +126,7 @@ def test_java_transformer(self):

source_table = t_env.from_elements([(1, 2, 3, 4), (4, 3, 2, 1)], ['a', 'b', 'c', 'd'])
transformer = WrapperTransformer(selected_cols=["a", "b"])
transformer\
.transform(t_env, source_table)\
.insert_into("TransformerResults")

# execute
t_env.execute('JavaPipelineITCase')
exec_insert_table(transformer.transform(t_env, source_table), "TransformerResults")
actual = source_sink_utils.results()
self.assert_equals(actual, ["1,2", "4,3"])

Expand All @@ -158,12 +151,8 @@ def test_pipeline(self):

# pipeline
pipeline = Pipeline().append_stage(transformer).append_stage(estimator)
pipeline\
.fit(t_env, train_table)\
.transform(t_env, serving_table)\
.insert_into('PredictResults')
# execute
t_env.execute('PipelineITCase')
exec_insert_table(pipeline.fit(t_env, train_table).transform(t_env, serving_table),
'PredictResults')

actual = source_sink_utils.results()
# the first input is false since 0 + 0 is smaller than the max_sum 14.
Expand All @@ -190,12 +179,8 @@ def test_pipeline_from_and_to_java_json(self):

source_table = t_env.from_elements([(1, 2, 3, 4), (4, 3, 2, 1)], ['a', 'b', 'c', 'd'])
transformer = p.get_stages()[0]
transformer\
.transform(t_env, source_table)\
.insert_into("TestJsonResults")
exec_insert_table(transformer.transform(t_env, source_table), "TestJsonResults")

# execute
t_env.execute('JavaPipelineITCase')
actual = source_sink_utils.results()

self.assert_equals(actual, ["1,2", "4,3"])
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/table/examples/batch/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def word_count():
'connector.path' = '{}'
)
""".format(result_path)
t_env.sql_update(sink_ddl)
t_env.execute_sql(sink_ddl)

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
Expand Down
11 changes: 6 additions & 5 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ class Table(object):
>>> t_env = StreamTableEnvironment.create(env)
>>> ...
>>> t_env.register_table_source("source", ...)
>>> t = t_env.scan("source")
>>> t = t_env.from_path("source")
>>> t.select(...)
>>> ...
>>> t_env.register_table_sink("result", ...)
>>> t.insert_into("result")
>>> t_env.execute("table_job")
>>> t.execute_insert("result")
Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`,
:func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by`
Expand Down Expand Up @@ -287,7 +286,8 @@ def join_lateral(self, table_function_call, join_predicate=None):
Example:
::
>>> t_env.register_java_function("split", "java.table.function.class.name")
>>> t_env.create_java_temporary_system_function("split",
... "java.table.function.class.name")
>>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
:param table_function_call: An expression representing a table function call.
Expand All @@ -314,7 +314,8 @@ def left_outer_join_lateral(self, table_function_call, join_predicate=None):
Example:
::
>>> t_env.register_java_function("split", "java.table.function.class.name")
>>> t_env.create_java_temporary_system_function("split",
... "java.table.function.class.name")
>>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
:param table_function_call: An expression representing a table function call.
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def from_table_source(self, table_source):
:return: The result table.
:rtype: pyflink.table.Table
"""
warnings.warn("Deprecated in 1.11.", DeprecationWarning)
return Table(self._j_tenv.fromTableSource(table_source._j_table_source), self)

def register_catalog(self, catalog_name, catalog):
Expand Down Expand Up @@ -569,7 +570,7 @@ def insert_into(self, target_path, table):
.. note:: Deprecated in 1.11. Use :func:`execute_insert` for single sink,
use :func:`create_statement_set` for multiple sinks.
"""
warnings.warn("Deprecated in 1.11. Use execute_insert for single sink,"
warnings.warn("Deprecated in 1.11. Use Table#execute_insert for single sink,"
"use create_statement_set for multiple sinks.", DeprecationWarning)
self._j_tenv.insertInto(target_path, table._j_table)

Expand Down Expand Up @@ -1425,7 +1426,6 @@ def _from_elements(self, elements, schema):
:param elements: The elements to create a table from.
:return: The result :class:`~pyflink.table.Table`.
"""

# serializes to a file, and we read the file in java
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=tempfile.mkdtemp())
serializer = BatchedSerializer(self._serializer)
Expand Down
8 changes: 3 additions & 5 deletions flink-python/pyflink/table/tests/test_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pyflink.table.tests.test_types import ExamplePoint, PythonOnlyPoint, ExamplePointUDT, \
PythonOnlyUDT
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, exec_insert_table


class StreamTableCalcTests(PyFlinkStreamTableTestCase):
Expand Down Expand Up @@ -95,8 +95,7 @@ def test_from_element(self):
{"key": 1.0}, bytearray(b'ABCD'), ExamplePoint(1.0, 2.0),
PythonOnlyPoint(3.0, 4.0))],
schema)
t.insert_into("Results")
t_env.execute("test")
exec_insert_table(t, "Results")
actual = source_sink_utils.results()

expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
Expand Down Expand Up @@ -140,8 +139,7 @@ def test_blink_from_element(self):
{"key": 1.0}, bytearray(b'ABCD'),
PythonOnlyPoint(3.0, 4.0))],
schema)
t.insert_into("Results")
t_env.execute("test")
exec_insert_table(t, "Results")
actual = source_sink_utils.results()

expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
Expand Down
12 changes: 8 additions & 4 deletions flink-python/pyflink/table/tests/test_correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class CorrelateTests(PyFlinkStreamTableTestCase):

def test_join_lateral(self):
t_env = self.t_env
t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.join_lateral("split(words) as (word)")
Expand All @@ -34,7 +35,8 @@ def test_join_lateral(self):

def test_join_lateral_with_join_predicate(self):
t_env = self.t_env
t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.join_lateral("split(words) as (word)", "id = word")
Expand All @@ -47,7 +49,8 @@ def test_join_lateral_with_join_predicate(self):

def test_left_outer_join_lateral(self):
t_env = self.t_env
t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

result = source.left_outer_join_lateral("split(words) as (word)")
Expand All @@ -59,7 +62,8 @@ def test_left_outer_join_lateral(self):

def test_left_outer_join_lateral_with_join_predicate(self):
t_env = self.t_env
t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
t_env.create_java_temporary_system_function("split",
"org.apache.flink.table.utils.TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])

# only support "true" as the join predicate currently
Expand Down
Loading

0 comments on commit b6592fc

Please sign in to comment.