Skip to content

Commit

Permalink
[FLINK-19041][python] Add dependency management for ConnectedStream i…
Browse files Browse the repository at this point in the history
…n Python DataStream API. (apache#13236)
  • Loading branch information
shuiqiangchen committed Aug 26, 2020
1 parent f646781 commit 66797ac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
29 changes: 29 additions & 0 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# limitations under the License.
################################################################################
import decimal
import os
import uuid

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
Expand Down Expand Up @@ -121,6 +123,33 @@ def test_co_map_function_without_data_types(self):
results.sort()
self.assertEqual(expected, results)

def test_connected_streams_with_dependency(self):
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
with open(python_file_path, 'w') as f:
f.write("def add_two(a):\n return a + 2")

class TestCoMapFunction(CoMapFunction):

def map1(self, value):
from test_stream_dependency_manage_lib import add_two
return add_two(value)

def map2(self, value):
return value + 1

self.env.add_python_file(python_file_path)
ds = self.env.from_collection([1, 2, 3, 4, 5])
ds_1 = ds.map(lambda x: x * 2)
ds.connect(ds_1).map(TestCoMapFunction()).add_sink(self.test_sink)
self.env.execute("test co-map add python file")
result = self.test_sink.get_results(True)
expected = ['11', '3', '3', '4', '5', '5', '6', '7', '7', '9']
result.sort()
expected.sort()
self.assertEqual(expected, result)

def test_co_map_function_with_data_types(self):
self.env.set_parallelism(1)
ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonPartitionCustomFunctionOperator;
import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonStatelessFunctionOperator;
import org.apache.flink.datastream.runtime.operators.python.DataStreamTwoInputPythonStatelessFunctionOperator;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
Expand All @@ -28,6 +29,7 @@
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperatorBase;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -131,13 +133,14 @@ public static StreamGraph generateStreamGraphWithDependencies(
StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
if (streamOperatorFactory instanceof SimpleOperatorFactory) {
StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
if (streamOperator instanceof DataStreamPythonStatelessFunctionOperator) {
DataStreamPythonStatelessFunctionOperator dataStreamPythonStatelessFunctionOperator =
(DataStreamPythonStatelessFunctionOperator) streamOperator;
if ((streamOperator instanceof DataStreamPythonStatelessFunctionOperator) ||
(streamOperator instanceof DataStreamTwoInputPythonStatelessFunctionOperator)) {
AbstractPythonFunctionOperatorBase abstractPythonFunctionOperatorBase =
(AbstractPythonFunctionOperatorBase) streamOperator;

Configuration oldConfig = dataStreamPythonStatelessFunctionOperator.getPythonConfig()
Configuration oldConfig = abstractPythonFunctionOperatorBase.getPythonConfig()
.getMergedConfig();
dataStreamPythonStatelessFunctionOperator.setPythonConfig(generateNewPythonConfig(oldConfig,
abstractPythonFunctionOperatorBase.setPythonConfig(generateNewPythonConfig(oldConfig,
mergedConfig));
}
}
Expand Down

0 comments on commit 66797ac

Please sign in to comment.