Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12541][REST] Support to submit Python Table API jobs via REST API #8532

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[FLINK-12541][REST] Support to submit Python jobs via REST API
  • Loading branch information
dianfu committed Jun 20, 2019
commit d24c4f7af0dbf8890d87f040a932121ef9e8d869
Original file line number Diff line number Diff line change
Expand Up @@ -100,42 +100,47 @@ protected CompletableFuture<ArtifactListInfo> handleRequest(@Nonnull HandlerRequ
continue;
}
String name = id.substring(startIndex + 1);
if (name.length() < 5 || !name.endsWith(".jar")) {
if (!((name.length() >= 5 && name.endsWith(".jar")) ||
(name.length() >= 4 && name.endsWith(".py")) ||
(name.length() >= 5 && name.endsWith(".zip")) ||
(name.length() >= 5 && name.endsWith(".egg")))) {
continue;
}

List<ArtifactListInfo.ArtifactEntryInfo> artifactEntryList = new ArrayList<>();
String[] classes = new String[0];
try (JarFile jar = new JarFile(f)) {
Manifest manifest = jar.getManifest();
String assemblerClass = null;

if (manifest != null) {
assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
if (assemblerClass == null) {
assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
if (name.endsWith(".jar")) {
String[] classes = new String[0];
try (JarFile jar = new JarFile(f)) {
Manifest manifest = jar.getManifest();
String assemblerClass = null;

if (manifest != null) {
assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
if (assemblerClass == null) {
assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
}
}
if (assemblerClass != null) {
classes = assemblerClass.split(",");
}
} catch (IOException ignored) {
// we simply show no entries here
}
if (assemblerClass != null) {
classes = assemblerClass.split(",");
}
} catch (IOException ignored) {
// we simply show no entries here
}

// show every entry class that can be loaded later on.
for (String clazz : classes) {
clazz = clazz.trim();
// show every entry class that can be loaded later on.
for (String clazz : classes) {
clazz = clazz.trim();

PackagedProgram program = null;
try {
program = new PackagedProgram(f, clazz, new String[0]);
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a PackagedProgram
}
if (program != null) {
ArtifactListInfo.ArtifactEntryInfo artifactEntryInfo = new ArtifactListInfo.ArtifactEntryInfo(clazz, program.getDescription());
artifactEntryList.add(artifactEntryInfo);
PackagedProgram program = null;
try {
program = new PackagedProgram(f, clazz, new String[0]);
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a PackagedProgram
}
if (program != null) {
ArtifactListInfo.ArtifactEntryInfo artifactEntryInfo = new ArtifactListInfo.ArtifactEntryInfo(clazz, program.getDescription());
artifactEntryList.add(artifactEntryInfo);
}
}
}

Expand All @@ -150,7 +155,9 @@ protected CompletableFuture<ArtifactListInfo> handleRequest(@Nonnull HandlerRequ
}

private File[] getArtifactFiles() {
final File[] list = artifactDir.listFiles((dir, name) -> name.endsWith(".jar"));
final File[] list = artifactDir.listFiles(
(dir, name) ->
name.endsWith(".jar") || name.endsWith(".py") || name.endsWith(".zip") || name.endsWith(".egg"));
if (list == null) {
log.warn("Artifact upload dir {} does not exist, or had been deleted externally. " +
"Previously uploaded artifacts are no longer available.", artifactDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static java.util.Objects.requireNonNull;

/**
* Handles artifact file uploads.
* Handles artifact file uploads, such as .jar, .py, .zip, .egg, etc.
*/
public class ArtifactUploadHandler extends
AbstractRestHandler<RestfulGateway, EmptyRequestBody, ArtifactUploadResponseBody, EmptyMessageParameters> {
Expand Down Expand Up @@ -76,10 +76,14 @@ protected CompletableFuture<ArtifactUploadResponseBody> handleRequest(
throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
final Path fileUpload = uploadedFiles.iterator().next().toPath();
final String fileUploadFileName = fileUpload.getFileName().toString();
return CompletableFuture.supplyAsync(() -> {
if (!fileUpload.getFileName().toString().endsWith(".jar")) {
if (!fileUploadFileName.endsWith(".jar") &&
!fileUploadFileName.endsWith(".py") &&
!fileUploadFileName.endsWith(".zip") &&
!fileUploadFileName.endsWith(".egg")) {
throw new CompletionException(new RestHandlerException(
"Only Jar files are allowed.",
"Only .jar, .py, .zip, .egg files are allowed.",
HttpResponseStatus.BAD_REQUEST));
} else {
final Path destination = artifactDir.resolve(UUID.randomUUID() + "_" + fileUpload.getFileName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
Expand All @@ -31,7 +32,6 @@
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.ArtifactIdPathParameter;
import org.apache.flink.runtime.webmonitor.handlers.ArtifactRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.ArtifactRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.EntryClassQueryParameter;
import org.apache.flink.runtime.webmonitor.handlers.ParallelismQueryParameter;
import org.apache.flink.runtime.webmonitor.handlers.ProgramArgQueryParameter;
Expand All @@ -44,14 +44,26 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
Expand All @@ -60,7 +72,7 @@
/**
* Utils for artifact handlers.
*
* @see ArtifactRunHandler
* @see org.apache.flink.runtime.webmonitor.handlers.ArtifactRunHandler
* @see org.apache.flink.runtime.webmonitor.handlers.ArtifactPlanHandler
*/
public class ArtifactHandlerUtils {
Expand All @@ -72,10 +84,12 @@ public static class ArtifactHandlerContext {
private final List<String> programArgs;
private final int parallelism;
private final JobID jobId;
private File dependentJarFile;

private ArtifactHandlerContext(Path artifactFile, String entryClass, List<String> programArgs, int parallelism, JobID jobId) {
private ArtifactHandlerContext(Path artifactFile, String entryClass, File dependentJarFile, List<String> programArgs, int parallelism, JobID jobId) {
this.artifactFile = artifactFile;
this.entryClass = entryClass;
this.dependentJarFile = dependentJarFile;
this.programArgs = programArgs;
this.parallelism = parallelism;
this.jobId = jobId;
Expand Down Expand Up @@ -110,7 +124,49 @@ public static <R extends ArtifactRequestBody> ArtifactHandlerContext fromRequest
null, // Delegate default job ID to actual JobGraph generation
log);

return new ArtifactHandlerContext(artifactFile, entryClass, programArgs, parallelism, jobId);
File dependentJarFile = null;
final String artifactFileName = artifactFile.getFileName().toString();
if (artifactFileName.endsWith(".py") ||
artifactFileName.endsWith(".zip") ||
artifactFileName.endsWith(".egg")) {
final List<String> list = new ArrayList<>();
if (artifactFileName.endsWith(".py")) {
list.add("-py");
list.add(artifactFile.toFile().getAbsolutePath());
} else {
if (entryClass == null) {
throw new CompletionException(new RestHandlerException(
"Entry class is missing", HttpResponseStatus.BAD_REQUEST));
}

list.add("-pym");
list.add(entryClass);
list.add("-pyfs");

final StringBuilder pyFiles = new StringBuilder();
pyFiles.append(artifactFile.toFile().getAbsolutePath());
// extract the contained libraries under directory lib of the zip file
if (artifactFileName.endsWith(".zip")) {
Tuple2<File, List<File>> containedLibraries =
extractContainedLibraries(artifactFile.toFile().getAbsoluteFile().toURI());
dependentJarFile = containedLibraries.f0;

if (!containedLibraries.f1.isEmpty()) {
pyFiles.append(",");
pyFiles.append(containedLibraries.f1.stream()
.map(File::getAbsolutePath).collect(Collectors.joining(",")));
}
}
list.add(pyFiles.toString());
}
list.addAll(programArgs);

// sets the entry class to PythonDriver for Python jobs
entryClass = "org.apache.flink.python.client.PythonDriver";
programArgs = list;
}

return new ArtifactHandlerContext(artifactFile, entryClass, dependentJarFile, programArgs, parallelism, jobId);
}

public JobGraph toJobGraph(Configuration configuration) {
Expand All @@ -119,9 +175,18 @@ public JobGraph toJobGraph(Configuration configuration) {
String.format("Artifact file %s does not exist", artifactFile), HttpResponseStatus.BAD_REQUEST));
}

File jarFile = null;
if (artifactFile.getFileName().toString().endsWith(".jar")) {
// the artifact is Java
jarFile = artifactFile.toFile();
} else if (dependentJarFile != null) {
// the artifact is Python
jarFile = dependentJarFile;
}

try {
final PackagedProgram packagedProgram = new PackagedProgram(
artifactFile.toFile(),
jarFile,
entryClass,
programArgs.toArray(new String[0]));
return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism, jobId);
Expand Down Expand Up @@ -160,6 +225,102 @@ private static <R extends ArtifactRequestBody, M extends MessageParameters> List
}
}

@VisibleForTesting
protected static Tuple2<File, List<File>> extractContainedLibraries(URI artifactFile) {
try (ZipFile zipFile = new ZipFile(new File(artifactFile))) {
ZipEntry containedJarFileEntry = null;
final List<ZipEntry> containedPythonFileEntries = new ArrayList<>();

Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()) {
ZipEntry entry = entries.nextElement();
String name = entry.getName();

if (name.length() > 8 && name.startsWith("lib/") && name.endsWith(".jar")) {
if (containedJarFileEntry == null) {
containedJarFileEntry = entry;
} else {
throw new RuntimeException(
String.format("Artifact file %s contains multiple jar files", artifactFile));
}
}

if (name.length() > 7 && name.startsWith("lib/") &&
(name.endsWith(".py") || name.endsWith(".zip") || name.endsWith(".egg"))) {
containedPythonFileEntries.add(entry);
}
}

File extractedTempJar = null;
final List<File> extractedTempLibraries = new ArrayList<>(containedPythonFileEntries.size());
boolean incomplete = true;
try {
if (containedJarFileEntry != null) {
extractedTempJar = extractZipEntry(zipFile, containedJarFileEntry);
}

for (ZipEntry containedPythonFileEntry : containedPythonFileEntries) {
extractedTempLibraries.add(extractZipEntry(zipFile, containedPythonFileEntry));
}

incomplete = false;
} finally {
if (incomplete) {
if (extractedTempJar != null) {
extractedTempJar.delete();
}
for (File f : extractedTempLibraries) {
f.delete();
}
}
}

return Tuple2.of(extractedTempJar, extractedTempLibraries);
} catch (Throwable t) {
throw new CompletionException(new RestHandlerException(
"Unknown I/O error while extracting contained library files.",
HttpResponseStatus.BAD_REQUEST,
t));
}
}

private static File extractZipEntry(final ZipFile zipFile, final ZipEntry zipEntry) {
final Random rnd = new Random();
final byte[] buffer = new byte[4096];
String name = zipEntry.getName();
// '/' as in case of zip, jar
// java.util.zip.ZipEntry#isDirectory always looks only for '/' not for File.separator
name = name.replace('/', '_');

File tempFile;
try {
tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name);
tempFile.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException(
String.format("An I/O error occurred while creating temporary file to extract nested " +
"library '%s'.", zipEntry.getName()), e);
}

// copy the temp file contents to a temporary File
try (
OutputStream out = new FileOutputStream(tempFile);
InputStream in = new BufferedInputStream(zipFile.getInputStream(zipEntry))) {

int numRead;
while ((numRead = in.read(buffer)) != -1) {
out.write(buffer, 0, numRead);
}
} catch (IOException e) {
throw new RuntimeException(
String.format(
"An I/O error occurred while extracting nested library '%s' to " +
"temporary file '%s'.", zipEntry.getName(), tempFile.getAbsolutePath()), e);
}

return tempFile;
}

private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");

/**
Expand Down
Loading