diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 3fec94720d516..864ea30f7c69a 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -117,7 +117,8 @@ private void startPython() throws IOException { String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); - process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")}); + String arguments = config.getString(PLAN_ARGUMENTS_KEY, ""); + process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments); outPrinter = new Thread(new StreamPrinter(process.getInputStream())); outPrinter.start(); errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 55cf1dc48610a..92a985c36cc32 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -37,18 +37,19 @@ protected boolean skipCollectionExecution() { return true; } - private static String findUtilsFile() throws Exception { + private static Path getBaseTestPythonDir() { FileSystem fs = FileSystem.getLocalFileSystem(); - return fs.getWorkingDirectory().toString() - + "/src/test/python/org/apache/flink/python/api/utils/utils.py"; + return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api"); + } + + private static String findUtilsFile() throws Exception { + return new Path(getBaseTestPythonDir(), "utils/utils.py").toString(); } private static List findTestFiles() throws Exception { List files = new ArrayList<>(); FileSystem fs = FileSystem.getLocalFileSystem(); - FileStatus[] status = fs.listStatus( - new Path(fs.getWorkingDirectory().toString() - + "/src/test/python/org/apache/flink/python/api")); + FileStatus[] status = fs.listStatus(getBaseTestPythonDir()); for (FileStatus f : status) { String file = f.getPath().toString(); if (file.endsWith(".py")) { @@ -126,11 +127,13 @@ protected void testProgram() throws Exception { if (python2 != null) { log.info("Running python2 tests"); runTestPrograms(python2); + runArgvTestPrograms(python2); } String python3 = getPython3Path(); if (python3 != null) { log.info("Running python3 tests"); runTestPrograms(python3); + runArgvTestPrograms(python3); } } @@ -177,4 +180,25 @@ private void testBoundCheck() throws Exception { // we expect this exception to be thrown since no argument was passed } } + + private void runArgvTestPrograms(String pythonBinary) throws Exception { + log.info("Running runArgvTestPrograms."); + String utils = findUtilsFile(); + + { + String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString(); + + Configuration configuration = new Configuration(); + configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary); + new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils}); + } + + { + String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString(); + + Configuration configuration = new Configuration(); + configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary); + new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"}); + } + } } diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py new file mode 100644 index 0000000000000..57b44c35cb7ea --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py @@ -0,0 +1,32 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +import sys +from utils import Verify + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(len(sys.argv)) + + d1.map_partition(Verify([3], "MultipleArguments")).output() + + #Execution + env.set_parallelism(1) + + env.execute(local=True) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py new file mode 100644 index 0000000000000..6afe7f2da8cb7 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py @@ -0,0 +1,32 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +import sys +from utils import Verify + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(len(sys.argv)) + + d1.map_partition(Verify([1], "NoArgument")).output() + + #Execution + env.set_parallelism(1) + + env.execute(local=True)