Skip to content

Commit

Permalink
[FLINK-18888][python] Support execute_async for StreamExecutionEnviro…
Browse files Browse the repository at this point in the history
…nment. (apache#13126)
  • Loading branch information
shuiqiangchen committed Aug 12, 2020
1 parent 64cdc8e commit 5dadbf3
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
15 changes: 15 additions & 0 deletions flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import List, Any

from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.job_client import JobClient
from pyflink.common.job_execution_result import JobExecutionResult
from pyflink.common.restart_strategy import RestartStrategies
from pyflink.common.typeinfo import PickledBytesTypeInfo, TypeInformation
Expand Down Expand Up @@ -420,6 +421,20 @@ def execute(self, job_name=None):
else:
return JobExecutionResult(self._j_stream_execution_environment.execute(job_name))

def execute_async(self, job_name: str = 'Flink Streaming Job') -> JobClient:
"""
Triggers the program asynchronously. The environment will execute all parts of the program
that have resulted in a "sink" operation. Sink operations are for example printing results
or forwarding them to a message queue.
The program execution will be logged and displayed with a generated default name.
:param job_name: Desired name of the job.
:return: A JobClient that can be used to communicate with the submitted job, completed on
submission succeeded.
"""
j_job_client = self._j_stream_execution_environment.executeAsync(job_name)
return JobClient(j_job_client=j_job_client)

def get_execution_plan(self):
"""
Creates the plan with which the system will execute the program, and returns it as
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def test_keyed_stream_partitioning(self):
keyed_stream.forward()

def tearDown(self) -> None:
self.test_sink.get_results()
self.test_sink.clear()


class MyMapFunction(MapFunction):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):

def setUp(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.test_sink = DataStreamTestSinkFunction()

def test_get_config(self):
execution_config = self.env.get_config()
Expand Down Expand Up @@ -217,10 +218,9 @@ def test_execute(self):

def test_from_collection_without_data_types(self):
ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
test_sink = DataStreamTestSinkFunction()
ds.add_sink(test_sink)
ds.add_sink(self.test_sink)
self.env.execute("test from collection")
results = test_sink.get_results(True)
results = self.test_sink.get_results(True)
# user does not specify data types for input data, the collected result should be in
# in tuple format as inputs.
expected = ["(1, 'Hi', 'Hello')", "(2, 'Hello', 'Hi')"]
Expand All @@ -233,10 +233,9 @@ def test_from_collection_with_data_types(self):
type_info=Types.ROW([Types.INT(),
Types.STRING(),
Types.STRING()]))
test_sink = DataStreamTestSinkFunction()
ds.add_sink(test_sink)
ds.add_sink(self.test_sink)
self.env.execute("test from collection")
results = test_sink.get_results(False)
results = self.test_sink.get_results(False)
# if user specifies data types of input data, the collected result should be in row format.
expected = ['1,Hi,Hello', '2,Hello,Hi']
results.sort()
Expand All @@ -246,10 +245,9 @@ def test_from_collection_with_data_types(self):
def test_add_custom_source(self):
custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction")
ds = self.env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()]))
test_sink = DataStreamTestSinkFunction()
ds.add_sink(test_sink)
ds.add_sink(self.test_sink)
self.env.execute("test add custom source")
results = test_sink.get_results(False)
results = self.test_sink.get_results(False)
expected = ['3,Mike', '1,Marry', '4,Ted', '5,Jack', '0,Bob', '2,Henry']
results.sort()
expected.sort()
Expand All @@ -264,10 +262,23 @@ def test_read_text_file(self):
f.write('\n')

ds = self.env.read_text_file(text_file_path)
test_sink = DataStreamTestSinkFunction()
ds.add_sink(test_sink)
ds.add_sink(self.test_sink)
self.env.execute("test read text file")
results = test_sink.get_results()
results = self.test_sink.get_results()
results.sort()
texts.sort()
self.assertEqual(texts, results)

def test_execute_async(self):
ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
type_info=Types.ROW(
[Types.INT(), Types.STRING(), Types.STRING()]))
ds.add_sink(self.test_sink)
job_client = self.env.execute_async("test execute async")
job_id = job_client.get_job_id()
self.assertIsNotNone(job_id)
execution_result = job_client.get_job_execution_result().result()
self.assertEqual(str(job_id), str(execution_result.get_job_id()))

def tearDown(self) -> None:
self.test_sink.clear()
4 changes: 2 additions & 2 deletions flink-python/pyflink/datastream/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ def get_results(self, is_python_object: bool = False):
return str_results

def clear(self):
if self._j_data_stream_test_collect_sink is None:
if self.j_data_stream_collect_sink is None:
return
self._j_data_stream_test_collect_sink.collectAndClear()
self.j_data_stream_collect_sink.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public List<Object> collectAndClear(boolean isPythonObjects) {
listToBeReturned.add(obj.toString());
}
}
collectedResult.clear();
clear();
return listToBeReturned;
}

public void clear() {
collectedResult.clear();
}
}

0 comments on commit 5dadbf3

Please sign in to comment.