Skip to content

Commit

Permalink
[FLINK-12541][container] Add support for Python jobs in StandaloneJob…
Browse files Browse the repository at this point in the history
…ClusterEntryPoint
  • Loading branch information
dianfu committed Jun 4, 2019
1 parent 6d0c815 commit 6033b4d
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever {
private final String[] programArguments;

@Nullable
private final String jobClassName;
private final String jobEntryPointName;

@Nullable
private final String jobPythonArtifacts;

@Nonnull
private final Supplier<Iterable<File>> jarsOnClassPath;
Expand All @@ -73,27 +76,31 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever {
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE);
@Nullable String jobEntryPointName,
@Nullable String jobPythonArtifacts) {
this(jobId, savepointRestoreSettings, programArguments, jobEntryPointName, jobPythonArtifacts, JarsOnClassPath.INSTANCE);
}

@VisibleForTesting
ClassPathJobGraphRetriever(
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName,
@Nullable String jobEntryPointName,
@Nullable String jobPythonArtifacts,
@Nonnull Supplier<Iterable<File>> jarsOnClassPath) {
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
this.jobEntryPointName = jobEntryPointName;
this.jobPythonArtifacts = jobPythonArtifacts;
this.jarsOnClassPath = requireNonNull(jarsOnClassPath, "jarsOnClassPath");
}

@Override
public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
final PackagedProgram packagedProgram = createPackagedProgram();
final String entryClass = getJobEntryPointNameOrScanClassPath();
final PackagedProgram packagedProgram = createPackagedProgram(entryClass, false);
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
try {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(
Expand All @@ -110,19 +117,38 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
}
}

private PackagedProgram createPackagedProgram() throws FlinkException {
final String entryClass = getJobClassNameOrScanClassPath();
private PackagedProgram createPackagedProgram(final String entryClass, final boolean isPythonProgram) throws FlinkException {
try {
final Class<?> mainClass = getClass().getClassLoader().loadClass(entryClass);
return new PackagedProgram(mainClass, programArguments);
} catch (ClassNotFoundException | ProgramInvocationException e) {
return new PackagedProgram(mainClass, getProgramArguments(isPythonProgram));
} catch (ProgramInvocationException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
} catch (ClassNotFoundException e) {
if (!isPythonProgram && jobPythonArtifacts != null) {
return createPackagedProgram("org.apache.flink.python.client.PythonDriver", true);
} else {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}
}

private String[] getProgramArguments(final boolean isPythonProgram) throws FlinkException {
if (isPythonProgram) {
final String[] args = new String[programArguments.length + 4];
args[0] = "pym";
args[1] = getJobEntryPointNameOrScanClassPath();
args[2] = "pyfs";
args[3] = jobPythonArtifacts;
System.arraycopy(programArguments, 0, args, 4, programArguments.length);
return args;
} else {
return programArguments;
}
}

private String getJobClassNameOrScanClassPath() throws FlinkException {
if (jobClassName != null) {
return jobClassName;
private String getJobEntryPointNameOrScanClassPath() throws FlinkException {
if (jobEntryPointName != null) {
return jobEntryPointName;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
private final JobID jobId;

@Nullable
private final String jobClassName;
private final String jobEntryPointName;

@Nullable
private final String jobPythonArtifacts;

StandaloneJobClusterConfiguration(
@Nonnull String configDir,
Expand All @@ -51,11 +54,13 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
int restPort,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull JobID jobId,
@Nullable String jobClassName) {
@Nullable String jobEntryPointName,
@Nullable String jobPythonArtifacts) {
super(configDir, dynamicProperties, args, hostname, restPort);
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.jobId = requireNonNull(jobId, "jobId");
this.jobClassName = jobClassName;
this.jobEntryPointName = jobEntryPointName;
this.jobPythonArtifacts = jobPythonArtifacts;
}

@Nonnull
Expand All @@ -69,7 +74,12 @@ JobID getJobId() {
}

@Nullable
String getJobClassName() {
return jobClassName;
String getJobEntryPointName() {
return jobEntryPointName;
}

@Nullable
String getJobPythonArtifacts() {
return jobPythonArtifacts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,29 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes

static final JobID DEFAULT_JOB_ID = new JobID(0, 0);

private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
@Deprecated
private static final Option JOB_CLASS_NAME_OPTION = Option.builder("jc")
.longOpt("job-classname")
.required(false)
.hasArg(true)
.argName("job class name")
.desc("Class name of the job to run.")
.desc("Class name of the job to run. (deprecated; use job-entrypoint instead.)")
.build();

private static final Option JOB_ENTRYPOINT_NAME_OPTION = Option.builder("j")
.longOpt("job-entrypoint")
.required(false)
.hasArg(true)
.argName("job entrypoint name")
.desc("Entrypoint name of the job to run.")
.build();

private static final Option JOB_PYTHON_ARTIFACTS_OPTION = Option.builder("ja")
.longOpt("job-python-artifacts")
.required(false)
.hasArg(true)
.argName("job python artifacts")
.desc("Comma separated python artifacts of the job to run. The artifacts can be files or directories.")
.build();

private static final Option JOB_ID_OPTION = Option.builder("jid")
Expand All @@ -67,6 +84,8 @@ public Options getOptions() {
options.addOption(CONFIG_DIR_OPTION);
options.addOption(REST_PORT_OPTION);
options.addOption(JOB_CLASS_NAME_OPTION);
options.addOption(JOB_ENTRYPOINT_NAME_OPTION);
options.addOption(JOB_PYTHON_ARTIFACTS_OPTION);
options.addOption(JOB_ID_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
Expand All @@ -83,7 +102,12 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
final JobID jobId = getJobId(commandLine);
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
String jobEntryPointName = commandLine.getOptionValue(JOB_ENTRYPOINT_NAME_OPTION.getOpt());
if (jobEntryPointName == null) {
jobEntryPointName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
}

final String jobPythonArtifacts = commandLine.getOptionValue(JOB_PYTHON_ARTIFACTS_OPTION.getOpt());

return new StandaloneJobClusterConfiguration(
configDir,
Expand All @@ -93,7 +117,8 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
restPort,
savepointRestoreSettings,
jobId,
jobClassName);
jobEntryPointName,
jobPythonArtifacts);
}

private int getRestPort(CommandLine commandLine) throws FlinkParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,31 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
private final String[] programArguments;

@Nullable
private final String jobClassName;
private final String jobEntryPointName;

@Nullable
private final String jobPythonArtifacts;

private StandaloneJobClusterEntryPoint(
Configuration configuration,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
@Nullable String jobEntryPointName,
@Nullable String jobPythonArtifacts) {
super(configuration);
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
this.jobEntryPointName = jobEntryPointName;
this.jobPythonArtifacts = jobPythonArtifacts;
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName));
new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobEntryPointName, jobPythonArtifacts));
}

public static void main(String[] args) {
Expand Down Expand Up @@ -100,7 +105,8 @@ public static void main(String[] args) {
clusterConfiguration.getJobId(),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());
clusterConfiguration.getJobEntryPointName(),
clusterConfiguration.getJobPythonArtifacts());

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void testJobGraphRetrieval() throws FlinkException {
jobId,
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS,
TestJob.class.getCanonicalName());
TestJob.class.getCanonicalName(),
null);

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

Expand All @@ -84,6 +85,7 @@ public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundEx
PROGRAM_ARGUMENTS,
// No class name specified, but the test JAR "is" on the class path
null,
null,
() -> Collections.singleton(testJar));

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration());
Expand All @@ -102,6 +104,7 @@ public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws
// Both a class name is specified and a JAR "is" on the class path
// The class name should have precedence.
TestJob.class.getCanonicalName(),
null,
() -> Collections.singleton(testJar));

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration());
Expand All @@ -119,7 +122,8 @@ public void testSavepointRestoreSettings() throws FlinkException {
jobId,
savepointRestoreSettings,
PROGRAM_ARGUMENTS,
TestJob.class.getCanonicalName());
TestJob.class.getCanonicalName(),
null);

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
final int restPort = 1234;
final String arg1 = "arg1";
final String arg2 = "arg2";
final String[] args = {"--configDir", CONFIG_DIR, "--webui-port", String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};
final String[] args = {"--configDir", CONFIG_DIR, "--webui-port", String.valueOf(restPort), "--job-entrypoint", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(CONFIG_DIR)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getJobEntryPointName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();

Expand All @@ -90,7 +90,7 @@ public void testOnlyRequiredArguments() throws FlinkParseException {
assertThat(clusterConfiguration.getHostname(), is(nullValue()));
assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(not(nullValue())));
assertThat(clusterConfiguration.getJobClassName(), is(nullValue()));
assertThat(clusterConfiguration.getJobEntryPointName(), is(nullValue()));
}

@Test(expected = FlinkParseException.class)
Expand All @@ -116,7 +116,7 @@ public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
@Test
public void testSetJobIdManually() throws FlinkParseException {
final JobID jobId = new JobID();
final String[] args = {"--configDir", "/foo/bar", "--job-classname", "foobar", "--job-id", jobId.toString()};
final String[] args = {"--configDir", "/foo/bar", "--job-entrypoint", "foobar", "--job-id", jobId.toString()};

final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);

Expand All @@ -126,7 +126,7 @@ public void testSetJobIdManually() throws FlinkParseException {
@Test
public void testInvalidJobIdThrows() {
final String invalidJobId = "0xINVALID";
final String[] args = {"--configDir", "/foo/bar", "--job-classname", "foobar", "--job-id", invalidJobId};
final String[] args = {"--configDir", "/foo/bar", "--job-entrypoint", "foobar", "--job-id", invalidJobId};

try {
commandLineParser.parse(args);
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testShortOptions() throws FlinkParseException {
final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getJobEntryPointName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getJobId(), is(equalTo(jobId)));

final SavepointRestoreSettings savepointRestoreSettings = clusterConfiguration.getSavepointRestoreSettings();
Expand Down

0 comments on commit 6033b4d

Please sign in to comment.