Skip to content

Commit

Permalink
[FLINK-12787][python] Allow to specify directory in option -pyfs
Browse files Browse the repository at this point in the history
This closes apache#8671
  • Loading branch information
dianfu authored and sunjincheng121 committed Jun 11, 2019
1 parent a21cc24 commit 74ef636
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,19 @@ protected ProgramOptions(CommandLine line) throws CliArgsException {
// -------------------------------transformed-------------------------------------------------------
// e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args)
// e.g. -py wordcount.py -pyfs file:https:///AAA.py,hdfs:https:///BBB.py --input in.txt --output out.txt(CLI cmd)
// -----> py wordcount.py pyfs file:https:///AAA.py,hdfs:https:///BBB.py --input in.txt --output out.txt(PythonDriver args)
// -----> -py wordcount.py -pyfs file:https:///AAA.py,hdfs:https:///BBB.py --input in.txt --output out.txt(PythonDriver args)
String[] newArgs;
int argIndex;
if (line.hasOption(PYFILES_OPTION.getOpt())) {
newArgs = new String[args.length + 4];
newArgs[2] = PYFILES_OPTION.getOpt();
newArgs[2] = "-" + PYFILES_OPTION.getOpt();
newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
argIndex = 4;
} else {
newArgs = new String[args.length + 2];
argIndex = 2;
}
newArgs[0] = PY_OPTION.getOpt();
newArgs[0] = "-" + PY_OPTION.getOpt();
newArgs[1] = line.getOptionValue(PY_OPTION.getOpt());
System.arraycopy(args, 0, newArgs, argIndex, args.length);
args = newArgs;
Expand All @@ -116,12 +116,12 @@ protected ProgramOptions(CommandLine line) throws CliArgsException {
}
// The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
// CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}.
// PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}.
// e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> pym AAA.fun -pyfs AAA.zip(PythonDriver args)
// PythonDriver args: -pym ${py-module} -pyfs ${py-files} [optional] ${other args}.
// e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> -pym AAA.fun -pyfs AAA.zip(PythonDriver args)
String[] newArgs = new String[args.length + 4];
newArgs[0] = PYMODULE_OPTION.getOpt();
newArgs[0] = "-" + PYMODULE_OPTION.getOpt();
newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt());
newArgs[2] = PYFILES_OPTION.getOpt();
newArgs[2] = "-" + PYFILES_OPTION.getOpt();
newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
System.arraycopy(args, 0, newArgs, 4, args.length);
args = newArgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@

package org.apache.flink.python.client;

import org.apache.flink.core.fs.Path;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A main class used to launch Python applications. It executes python as a
Expand All @@ -47,19 +44,29 @@ public static void main(String[] args) {
LOG.error("Required at least two arguments, only python file or python module is available.");
System.exit(1);
}

// parse args
Map<String, List<String>> parsedArgs = parseOptions(args);
final CommandLineParser<PythonDriverOptions> commandLineParser = new CommandLineParser<>(
new PythonDriverOptionsParserFactory());
PythonDriverOptions pythonDriverOptions = null;
try {
pythonDriverOptions = commandLineParser.parse(args);
} catch (Exception e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(PythonDriver.class.getSimpleName());
System.exit(1);
}

// start gateway server
GatewayServer gatewayServer = startGatewayServer();
// prepare python env

// map filename to its Path
Map<String, Path> filePathMap = new HashMap<>();
// commands which will be exec in python progress.
List<String> commands = constructPythonCommands(filePathMap, parsedArgs);
final List<String> commands = constructPythonCommands(pythonDriverOptions);
try {
// prepare the exec environment of python progress.
PythonEnvUtils.PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(filePathMap);
PythonEnvUtils.PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment(
pythonDriverOptions.getPythonLibFiles());
// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
// start the python process.
Expand All @@ -70,6 +77,10 @@ public static void main(String[] args) {
}
} catch (Throwable e) {
LOG.error("Run python process failed", e);

// throw ProgramAbortException if the caller is interested in the program plan,
// there is no harm to throw ProgramAbortException even if it is not the case.
throw new OptimizerPlanEnvironment.ProgramAbortException();
} finally {
gatewayServer.shutdown();
}
Expand All @@ -80,7 +91,7 @@ public static void main(String[] args) {
*
* @return The created GatewayServer
*/
public static GatewayServer startGatewayServer() {
static GatewayServer startGatewayServer() {
InetAddress localhost = InetAddress.getLoopbackAddress();
GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder()
.javaPort(0)
Expand All @@ -102,67 +113,13 @@ public static GatewayServer startGatewayServer() {
/**
* Constructs the Python commands which will be executed in python process.
*
* @param filePathMap stores python file name to its path
* @param parsedArgs parsed args
* @param pythonDriverOptions parsed Python command options
*/
public static List<String> constructPythonCommands(Map<String, Path> filePathMap, Map<String, List<String>> parsedArgs) {
List<String> commands = new ArrayList<>();
if (parsedArgs.containsKey("py")) {
String pythonFile = parsedArgs.get("py").get(0);
Path pythonFilePath = new Path(pythonFile);
filePathMap.put(pythonFilePath.getName(), pythonFilePath);
commands.add(pythonFilePath.getName());
}
if (parsedArgs.containsKey("pym")) {
String pyModule = parsedArgs.get("pym").get(0);
commands.add("-m");
commands.add(pyModule);
}
if (parsedArgs.containsKey("pyfs")) {
List<String> pyFiles = parsedArgs.get("pyfs");
for (String pyFile : pyFiles) {
Path pyFilePath = new Path(pyFile);
filePathMap.put(pyFilePath.getName(), pyFilePath);
}
}
if (parsedArgs.containsKey("args")) {
commands.addAll(parsedArgs.get("args"));
}
static List<String> constructPythonCommands(final PythonDriverOptions pythonDriverOptions) {
final List<String> commands = new ArrayList<>();
commands.add("-m");
commands.add(pythonDriverOptions.getEntrypointModule());
commands.addAll(pythonDriverOptions.getProgramArgs());
return commands;
}

/**
* Parses the args to the map format.
*
* @param args ["py", "xxx.py",
* "pyfs", "a.py,b.py,c.py",
* "--input", "in.txt"]
* @return {"py"->List("xxx.py"),"pyfs"->List("a.py","b.py","c.py"),"args"->List("--input","in.txt")}
*/
public static Map<String, List<String>> parseOptions(String[] args) {
Map<String, List<String>> parsedArgs = new HashMap<>();
int argIndex = 0;
boolean isEntrypointSpecified = false;
// valid args should include python or pyModule field and their value.
if (args[0].equals("py") || args[0].equals("pym")) {
parsedArgs.put(args[0], Collections.singletonList(args[1]));
argIndex = 2;
isEntrypointSpecified = true;
}
if (isEntrypointSpecified && args.length > 2 && args[2].equals("pyfs")) {
List<String> pyFilesList = new ArrayList<>(Arrays.asList(args[3].split(",")));
parsedArgs.put(args[2], pyFilesList);
argIndex = 4;
}
if (!isEntrypointSpecified) {
throw new RuntimeException("The Python entrypoint has not been specified. It can be specified with option -py or -pym");
}
// if arg include other args, the key "args" will map to other args.
if (args.length > argIndex) {
List<String> otherArgList = new ArrayList<>(args.length - argIndex);
otherArgList.addAll(Arrays.asList(args).subList(argIndex, args.length));
parsedArgs.put("args", otherArgList);
}
return parsedArgs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.python.client;

import org.apache.flink.core.fs.Path;

import javax.annotation.Nonnull;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* Options for the {@link PythonDriver}.
*/
final class PythonDriverOptions {

@Nonnull
private String entrypointModule;

@Nonnull
private List<Path> pythonLibFiles;

@Nonnull
private List<String> programArgs;

PythonDriverOptions(String entrypointModule, List<Path> pythonLibFiles, List<String> programArgs) {
this.entrypointModule = requireNonNull(entrypointModule, "entrypointModule");
this.pythonLibFiles = requireNonNull(pythonLibFiles, "pythonLibFiles");
this.programArgs = requireNonNull(programArgs, "programArgs");
}

@Nonnull
String getEntrypointModule() {
return entrypointModule;
}

@Nonnull
List<Path> getPythonLibFiles() {
return pythonLibFiles;
}

@Nonnull
List<String> getProgramArgs() {
return programArgs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.python.client;

import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Parser factory which generates a {@link PythonDriverOptions} from a given
* list of command line arguments.
*/
final class PythonDriverOptionsParserFactory implements ParserResultFactory<PythonDriverOptions> {

private static final Option PY_OPTION = Option.builder("py")
.longOpt("python")
.required(false)
.hasArg(true)
.argName("entrypoint python file")
.desc("Python script with the program entry point. " +
"The dependent resources can be configured with the `--pyFiles` option.")
.build();

private static final Option PYMODULE_OPTION = Option.builder("pym")
.longOpt("pyModule")
.required(false)
.hasArg(true)
.argName("entrypoint module name")
.desc("Python module with the program entry point. " +
"This option must be used in conjunction with `--pyFiles`.")
.build();

private static final Option PYFILES_OPTION = Option.builder("pyfs")
.longOpt("pyFiles")
.required(false)
.hasArg(true)
.argName("entrypoint python file")
.desc("Attach custom python files for job. " +
"Comma can be used as the separator to specify multiple files. " +
"The standard python resource file suffixes such as .py/.egg/.zip are all supported." +
"(eg: --pyFiles file:https:///tmp/myresource.zip,hdfs:https:///$namenode_address/myresource2.zip)")
.build();

@Override
public Options getOptions() {
final Options options = new Options();
options.addOption(PY_OPTION);
options.addOption(PYMODULE_OPTION);
options.addOption(PYFILES_OPTION);
return options;
}

@Override
public PythonDriverOptions createResult(@Nonnull CommandLine commandLine) throws FlinkParseException {
String entrypointModule = null;
final List<Path> pythonLibFiles = new ArrayList<>();

if (commandLine.hasOption(PY_OPTION.getOpt()) && commandLine.hasOption(PYMODULE_OPTION.getOpt())) {
throw new FlinkParseException("Cannot use options -py and -pym simultaneously.");
} else if (commandLine.hasOption(PY_OPTION.getOpt())) {
Path file = new Path(commandLine.getOptionValue(PY_OPTION.getOpt()));
String fileName = file.getName();
if (fileName.endsWith(".py")) {
entrypointModule = fileName.substring(0, fileName.length() - 3);
pythonLibFiles.add(file);
}
} else if (commandLine.hasOption(PYMODULE_OPTION.getOpt())) {
entrypointModule = commandLine.getOptionValue(PYMODULE_OPTION.getOpt());
} else {
throw new FlinkParseException(
"The Python entrypoint has not been specified. It can be specified with options -py or -pym");
}

if (commandLine.hasOption(PYFILES_OPTION.getOpt())) {
pythonLibFiles.addAll(
Arrays.stream(commandLine.getOptionValue(PYFILES_OPTION.getOpt()).split(","))
.map(Path::new)
.collect(Collectors.toCollection(ArrayList::new)));
} else if (commandLine.hasOption(PYMODULE_OPTION.getOpt())) {
throw new FlinkParseException("Option -pym must be used in conjunction with `--pyFiles`");
}

return new PythonDriverOptions(entrypointModule, pythonLibFiles, commandLine.getArgList());
}
}
Loading

0 comments on commit 74ef636

Please sign in to comment.