diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java deleted file mode 100644 index 65471a019d3e8..0000000000000 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.deployment.application; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.PackagedProgramRetriever; -import org.apache.flink.client.program.PackagedProgramUtils; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.function.FunctionUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.NoSuchElementException; -import java.util.function.Supplier; -import java.util.jar.JarEntry; -import java.util.jar.JarFile; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - -/** - * A {@link org.apache.flink.client.program.PackagedProgramRetriever PackagedProgramRetriever} which - * creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing - * the user's {@code main()} from a class on the class path. - */ -@Internal -public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever { - - private static final Logger LOG = - LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class); - - /** User classpaths in relative form to the working directory. */ - @Nonnull private final Collection userClassPaths; - - @Nonnull private final String[] programArguments; - - @Nullable private final String jobClassName; - - @Nonnull private final Supplier> jarsOnClassPath; - - @Nullable private final File userLibDirectory; - - @Nullable private final File jarFile; - - private ClassPathPackagedProgramRetriever( - @Nonnull String[] programArguments, - @Nullable String jobClassName, - @Nonnull Supplier> jarsOnClassPath, - @Nullable File userLibDirectory, - @Nullable File jarFile) - throws IOException { - this.userLibDirectory = userLibDirectory; - this.programArguments = requireNonNull(programArguments, "programArguments"); - this.jobClassName = jobClassName; - this.jarsOnClassPath = requireNonNull(jarsOnClassPath); - this.userClassPaths = discoverUserClassPaths(userLibDirectory); - this.jarFile = jarFile; - } - - private Collection discoverUserClassPaths(@Nullable File jobDir) throws IOException { - if (jobDir == null) { - return Collections.emptyList(); - } - - final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); - final Collection relativeJarURLs = - FileUtils.listFilesInDirectory(jobDir.toPath(), FileUtils::isJarFile).stream() - .map(path -> FileUtils.relativizePath(workingDirectory, path)) - .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) - .collect(Collectors.toList()); - return Collections.unmodifiableCollection(relativeJarURLs); - } - - @Override - public PackagedProgram getPackagedProgram() throws FlinkException { - try { - // It is Python job if program arguments contain "-py"/--python" or "-pym/--pyModule", - // set the fixed - // jobClassName and jarFile path. - if (PackagedProgramUtils.isPython(jobClassName) - || PackagedProgramUtils.isPython(programArguments)) { - String pythonJobClassName = PackagedProgramUtils.getPythonDriverClassName(); - File pythonJarFile = new File(PackagedProgramUtils.getPythonJar().getPath()); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(pythonJarFile) - .setEntryPointClassName(pythonJobClassName) - .build(); - } - - if (jarFile != null) { - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setArguments(programArguments) - .setJarFile(jarFile) - .setEntryPointClassName(jobClassName) - .build(); - } - - final String entryClass = getJobClassNameOrScanClassPath(); - return PackagedProgram.newBuilder() - .setUserClassPaths(new ArrayList<>(userClassPaths)) - .setEntryPointClassName(entryClass) - .setArguments(programArguments) - .build(); - } catch (ProgramInvocationException e) { - throw new FlinkException("Could not load the provided entrypoint class.", e); - } - } - - private String getJobClassNameOrScanClassPath() throws FlinkException { - if (jobClassName != null) { - if (userLibDirectory != null) { - // check that we find the entrypoint class in the user lib directory. - if (!userClassPathContainsJobClass(jobClassName)) { - throw new FlinkException( - String.format( - "Could not find the provided job class (%s) in the user lib directory (%s).", - jobClassName, userLibDirectory)); - } - } - return jobClassName; - } - - try { - return scanClassPathForJobJar(); - } catch (IOException | NoSuchElementException | IllegalArgumentException e) { - throw new FlinkException( - "Failed to find job JAR on class path. Please provide the job class name explicitly.", - e); - } - } - - private boolean userClassPathContainsJobClass(String jobClassName) { - for (URL userClassPath : userClassPaths) { - try (final JarFile jarFile = new JarFile(userClassPath.getFile())) { - if (jarContainsJobClass(jobClassName, jarFile)) { - return true; - } - } catch (IOException e) { - ExceptionUtils.rethrow( - e, - String.format( - "Failed to open user class path %s. Make sure that all files on the user class path can be accessed.", - userClassPath)); - } - } - return false; - } - - private boolean jarContainsJobClass(String jobClassName, JarFile jarFile) { - return jarFile.stream() - .map(JarEntry::getName) - .filter(fileName -> fileName.endsWith(FileUtils.CLASS_FILE_EXTENSION)) - .map(FileUtils::stripFileExtension) - .map( - fileName -> - fileName.replaceAll( - Pattern.quote(File.separator), FileUtils.PACKAGE_SEPARATOR)) - .anyMatch(name -> name.equals(jobClassName)); - } - - private String scanClassPathForJobJar() throws IOException { - final Iterable jars; - if (userLibDirectory == null) { - LOG.info("Scanning system class path for job JAR"); - jars = jarsOnClassPath.get(); - } else { - LOG.info("Scanning user class path for job JAR"); - jars = - userClassPaths.stream() - .map(url -> new File(url.getFile())) - .collect(Collectors.toList()); - } - - final JarManifestParser.JarFileWithEntryClass jobJar = - JarManifestParser.findOnlyEntryClass(jars); - LOG.info("Using {} as job jar", jobJar); - return jobJar.getEntryClass(); - } - - @VisibleForTesting - enum JarsOnClassPath implements Supplier> { - INSTANCE; - - static final String JAVA_CLASS_PATH = "java.class.path"; - static final String PATH_SEPARATOR = "path.separator"; - static final String DEFAULT_PATH_SEPARATOR = ":"; - - @Override - public Iterable get() { - String classPath = System.getProperty(JAVA_CLASS_PATH, ""); - String pathSeparator = System.getProperty(PATH_SEPARATOR, DEFAULT_PATH_SEPARATOR); - - return Arrays.stream(classPath.split(pathSeparator)) - .filter(JarsOnClassPath::notNullAndNotEmpty) - .map(File::new) - .filter(File::isFile) - .collect(Collectors.toList()); - } - - private static boolean notNullAndNotEmpty(String string) { - return string != null && !string.equals(""); - } - } - - /** A builder for the {@link ClassPathPackagedProgramRetriever}. */ - public static class Builder { - - private final String[] programArguments; - - @Nullable private String jobClassName; - - @Nullable private File userLibDirectory; - - private Supplier> jarsOnClassPath = JarsOnClassPath.INSTANCE; - - private File jarFile; - - private Builder(String[] programArguments) { - this.programArguments = requireNonNull(programArguments); - } - - public Builder setJobClassName(@Nullable String jobClassName) { - this.jobClassName = jobClassName; - return this; - } - - public Builder setUserLibDirectory(File userLibDirectory) { - this.userLibDirectory = userLibDirectory; - return this; - } - - public Builder setJarsOnClassPath(Supplier> jarsOnClassPath) { - this.jarsOnClassPath = jarsOnClassPath; - return this; - } - - public Builder setJarFile(File file) { - this.jarFile = file; - return this; - } - - public ClassPathPackagedProgramRetriever build() throws IOException { - return new ClassPathPackagedProgramRetriever( - programArguments, jobClassName, jarsOnClassPath, userLibDirectory, jarFile); - } - } - - public static Builder newBuilder(String[] programArguments) { - return new Builder(programArguments); - } -} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EntryClassInformationProvider.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EntryClassInformationProvider.java new file mode 100644 index 0000000000000..bb97e09b5c395 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EntryClassInformationProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import java.io.File; +import java.util.Optional; + +/** {@code EntryClassInformationProvider} provides information about the entry class. */ +public interface EntryClassInformationProvider { + + /** + * Returns the {@link File} referring to the Jar file that contains the job class or no {@code + * File} if the job class is located on the classpath. + * + * @return The {@code File} referring to the job's Jar archive or an empty {@code Optional} in + * case the job shall be extracted from the classpath. + */ + Optional getJarFile(); + + /** + * Returns the name of the job class or an empty {@code Optional} if the job class name cannot + * be provided. + * + * @return The name of the job class or an empty {@code Optional}. + */ + Optional getJobClassName(); +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProvider.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProvider.java new file mode 100644 index 0000000000000..7b9ade424f3ea --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProvider.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * {@code FromClasspathEntryClassInformationProvider} assumes the passed job class being available + * on some classpath. + */ +public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider { + + private final String jobClassName; + + /** + * Creates a {@code FromClasspathEntryClassInformationProvider} based on the passed job class + * and classpath. + * + * @param jobClassName The job's class name. + * @param classpath The classpath the job class should be part of. + * @return The {@code FromClasspathEntryClassInformationProvider} instances collecting the + * necessary information. + * @throws IOException If some Jar listed on the classpath wasn't accessible. + * @throws FlinkException If the passed job class is not present on the passed classpath. + */ + public static FromClasspathEntryClassInformationProvider create( + String jobClassName, Iterable classpath) throws IOException, FlinkException { + Preconditions.checkNotNull(jobClassName, "No job class name passed."); + Preconditions.checkNotNull(classpath, "No classpath passed."); + if (!userClasspathContainsJobClass(jobClassName, classpath)) { + throw new FlinkException( + String.format( + "Could not find the provided job class (%s) in the user lib directory.", + jobClassName)); + } + + return new FromClasspathEntryClassInformationProvider(jobClassName); + } + + /** + * Creates a {@code FromClasspathEntryClassInformationProvider} looking for the entry class + * providing the main method on the passed classpath. + * + * @param classpath The classpath the job class is expected to be part of. + * @return The {@code FromClasspathEntryClassInformationProvider} providing the job class found + * on the passed classpath. + * @throws IOException If some Jar listed on the classpath wasn't accessible. + * @throws FlinkException Either no or too many main methods were found on the classpath. + */ + public static FromClasspathEntryClassInformationProvider createFromClasspath( + Iterable classpath) throws IOException, FlinkException { + return new FromClasspathEntryClassInformationProvider( + extractJobClassFromUrlClasspath(classpath)); + } + + /** + * Creates a {@code FromClasspathEntryClassInformationProvider} looking for the entry class + * providing the main method on the system classpath. + * + * @return The {@code FromClasspathEntryClassInformationProvider} providing the job class found + * on the system classpath. + * @throws IOException If some Jar listed on the system classpath wasn't accessible. + * @throws FlinkException Either no or too many main methods were found on the system classpath. + */ + public static FromClasspathEntryClassInformationProvider createFromSystemClasspath() + throws IOException, FlinkException { + return new FromClasspathEntryClassInformationProvider(extractJobClassFromSystemClasspath()); + } + + /** + * Creates a {@code FromClasspathEntryClassInformationProvider} assuming that the passed job + * class is available on the system classpath. + * + * @param jobClassName The job class name working as the entry point. + * @return The {@code FromClasspathEntryClassInformationProvider} providing the job class found. + */ + public static FromClasspathEntryClassInformationProvider + createWithJobClassAssumingOnSystemClasspath(String jobClassName) { + return new FromClasspathEntryClassInformationProvider(jobClassName); + } + + private FromClasspathEntryClassInformationProvider(String jobClassName) { + this.jobClassName = Preconditions.checkNotNull(jobClassName, "No job class name set."); + } + + /** + * Always returns an empty {@code Optional} because this implementation relies on the JAR + * archive being available on either the user or the system classpath. + * + * @return An empty {@code Optional}. + */ + @Override + public Optional getJarFile() { + return Optional.empty(); + } + + /** + * Returns the job class name if it could be derived from the specified classpath or was + * explicitly specified. + * + * @return The job class name or an empty {@code Optional} if none was specified and it couldn't + * be derived from the classpath. + */ + @Override + public Optional getJobClassName() { + return Optional.of(jobClassName); + } + + @VisibleForTesting + static Iterable extractSystemClasspath() { + final String classpathPropertyValue = System.getProperty("java.class.path", ""); + final String pathSeparator = System.getProperty("path.separator", ":"); + + return Arrays.stream(classpathPropertyValue.split(pathSeparator)) + .filter(entry -> !StringUtils.isNullOrWhitespaceOnly(entry)) + .map(File::new) + .filter(File::isFile) + .filter(f -> isJarFilename(f.getName())) + .collect(Collectors.toList()); + } + + private static String extractJobClassFromSystemClasspath() throws FlinkException, IOException { + return extractJobClassNameFromFileClasspath(extractSystemClasspath()); + } + + private static String extractJobClassFromUrlClasspath(Iterable classpath) + throws IOException, FlinkException { + final List jarFilesFromClasspath = + StreamSupport.stream(classpath.spliterator(), false) + .map(url -> new File(url.getFile())) + .filter(f -> isJarFilename(f.getName())) + .collect(Collectors.toList()); + return extractJobClassNameFromFileClasspath(jarFilesFromClasspath); + } + + private static String extractJobClassNameFromFileClasspath(Iterable classpath) + throws FlinkException, IOException { + try { + return JarManifestParser.findOnlyEntryClass(classpath).getEntryClass(); + } catch (NoSuchElementException e) { + throw new FlinkException( + "No JAR found on classpath. Please provide a JAR explicitly.", e); + } catch (IllegalArgumentException e) { + throw new FlinkException( + "Multiple JAR archives with entry classes found on classpath. Please provide an entry class name.", + e); + } + } + + private static boolean userClasspathContainsJobClass( + String jobClassName, Iterable classpath) throws IOException { + for (URL url : classpath) { + if (!isJarFile(url)) { + continue; + } + + try (final JarFile jarFile = new JarFile(url.getFile())) { + if (jarContainsJobClass(jobClassName, jarFile)) { + return true; + } + } + } + return false; + } + + private static boolean jarContainsJobClass(String jobClassName, JarFile jarFile) { + return jarFile.stream() + .map(JarEntry::getName) + .filter(fileName -> fileName.endsWith(FileUtils.CLASS_FILE_EXTENSION)) + .map(FileUtils::stripFileExtension) + .map( + fileName -> + fileName.replaceAll( + Pattern.quote(File.separator), FileUtils.PACKAGE_SEPARATOR)) + .anyMatch(name -> name.equals(jobClassName)); + } + + private static boolean isJarFile(URL url) { + return isJarFilename(url.getFile()); + } + + private static boolean isJarFilename(String filename) { + return filename.endsWith(".jar"); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProvider.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProvider.java new file mode 100644 index 0000000000000..e50bc30a02cbc --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Optional; + +/** + * {@code FromJarEntryClassInformationProvider} is used for cases where the Jar archive is + * explicitly specified. + */ +public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider { + + private final File jarFile; + private final String jobClassName; + + /** + * Creates a {@code FromJarEntryClassInformationProvider} for a custom Jar archive. At least the + * {@code jarFile} or the {@code jobClassName} has to be set. + * + * @param jarFile The Jar archive. + * @param jobClassName The name of the job class. + * @return The {@code FromJarEntryClassInformationProvider} referring to the passed information. + */ + public static FromJarEntryClassInformationProvider createFromCustomJar( + File jarFile, @Nullable String jobClassName) { + return new FromJarEntryClassInformationProvider(jarFile, jobClassName); + } + + /** + * Creates a {@code FromJarEntryClassInformationProvider} for a job implemented in Python. + * + * @return A {@code FromJarEntryClassInformationProvider} for a job implemented in Python + */ + public static FromJarEntryClassInformationProvider createFromPythonJar() { + return new FromJarEntryClassInformationProvider( + new File(PackagedProgramUtils.getPythonJar().getPath()), + PackagedProgramUtils.getPythonDriverClassName()); + } + + private FromJarEntryClassInformationProvider(File jarFile, @Nullable String jobClassName) { + this.jarFile = Preconditions.checkNotNull(jarFile, "No jar archive is specified."); + this.jobClassName = jobClassName; + } + + /** + * Returns the specified {@code jarFile}. + * + * @return The specified {@code jarFile}. + * @see #getJobClassName() + */ + @Override + public Optional getJarFile() { + return Optional.of(jarFile); + } + + /** + * Returns the specified job class name that is either available in the corresponding {@code + * jarFile}. It can return an empty {@code Optional} if the job class is the entry class of the + * jar. + * + * @return Returns the job class that can be found in the respective {@code jarFile}. It can + * also return an empty {@code Optional} despite if the job class is the entry class of the + * jar. + * @see #getJarFile() + */ + @Override + public Optional getJobClassName() { + return Optional.ofNullable(jobClassName); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarManifestParser.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarManifestParser.java index e484786052e2c..a48a228904dbe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarManifestParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JarManifestParser.java @@ -34,7 +34,7 @@ import static java.util.Objects.requireNonNull; /** Utility that parses JAR manifest attributes. */ -class JarManifestParser { +public class JarManifestParser { static class JarFileWithEntryClass { private final File jarFile; @@ -118,7 +118,7 @@ static Optional findEntryClass(File jarFile) throws IOException { * @return Optional holding value of first found attribute * @throws IOException If there is an error accessing the JAR */ - private static Optional findFirstManifestAttribute(File jarFile, String... attributes) + public static Optional findFirstManifestAttribute(File jarFile, String... attributes) throws IOException { if (attributes.length == 0) { return Optional.empty(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java new file mode 100644 index 0000000000000..bcc9f127b3a0b --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.deployment.application.EntryClassInformationProvider; +import org.apache.flink.client.deployment.application.FromClasspathEntryClassInformationProvider; +import org.apache.flink.client.deployment.application.FromJarEntryClassInformationProvider; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.FunctionUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +/** + * {@code PackageProgramRetrieverImpl} is the default implementation of {@link + * PackagedProgramRetriever} that can either retrieve a {@link PackagedProgram} from a specific jar, + * some provided user classpath or the system classpath. + */ +public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever { + + private final EntryClassInformationProvider entryClassInformationProvider; + private final String[] programArguments; + private final List userClasspath; + + /** + * Creates a {@code PackageProgramRetrieverImpl} with the given parameters. + * + * @param userLibDir The user library directory that is used for generating the user classpath + * if specified. The system classpath is used if not specified. + * @param jobClassName The job class that will be used if specified. The classpath is used to + * detect any main class if not specified. + * @param programArgs The program arguments. + * @return The {@code PackageProgramRetrieverImpl} that can be used to create a {@link + * PackagedProgram} instance. + * @throws FlinkException If something goes wrong during instantiation. + */ + public static DefaultPackagedProgramRetriever create( + @Nullable File userLibDir, + @Nullable String jobClassName, + String[] programArgs) + throws FlinkException { + return create(userLibDir, null, jobClassName, programArgs); + } + + /** + * Creates a {@code PackageProgramRetrieverImpl} with the given parameters. + * + * @param userLibDir The user library directory that is used for generating the user classpath + * if specified. The system classpath is used if not specified. + * @param jarFile The jar archive expected to contain the job class included; {@code null} if + * the job class is on the system classpath. + * @param jobClassName The job class to use; if {@code null} the user classpath (or, if not set, + * the system classpath) will be scanned for possible main class. + * @param programArgs The program arguments. + * @return The {@code PackageProgramRetrieverImpl} that can be used to create a {@link + * PackagedProgram} instance. + * @throws FlinkException If something goes wrong during instantiation. + */ + public static DefaultPackagedProgramRetriever create( + @Nullable File userLibDir, + @Nullable File jarFile, + @Nullable String jobClassName, + String[] programArgs) + throws FlinkException { + List userClasspath; + try { + userClasspath = extractUserClasspath(userLibDir); + } catch (IOException e) { + throw new FlinkException("An error occurred while extracting the user classpath.", e); + } + + final EntryClassInformationProvider entryClassInformationProvider = + createEntryClassInformationProvider( + userLibDir == null ? null : userClasspath, + jarFile, + jobClassName, + programArgs); + return new DefaultPackagedProgramRetriever( + entryClassInformationProvider, programArgs, userClasspath); + } + + @VisibleForTesting + static EntryClassInformationProvider createEntryClassInformationProvider( + @Nullable Iterable userClasspath, + @Nullable File jarFile, + @Nullable String jobClassName, + String[] programArgs) + throws FlinkException { + if (PackagedProgramUtils.isPython(jobClassName) + || PackagedProgramUtils.isPython(programArgs)) { + return FromJarEntryClassInformationProvider.createFromPythonJar(); + } + + if (jarFile != null) { + return FromJarEntryClassInformationProvider.createFromCustomJar(jarFile, jobClassName); + } + + if (userClasspath != null) { + return fromUserClasspath(jobClassName, userClasspath); + } + + return fromSystemClasspath(jobClassName); + } + + private static EntryClassInformationProvider fromSystemClasspath(@Nullable String jobClassName) + throws FlinkException { + if (jobClassName != null) { + return FromClasspathEntryClassInformationProvider + .createWithJobClassAssumingOnSystemClasspath(jobClassName); + } + + try { + return FromClasspathEntryClassInformationProvider.createFromSystemClasspath(); + } catch (IOException | NoSuchElementException | IllegalArgumentException t) { + throw createGenericFlinkException(t); + } + } + + private static EntryClassInformationProvider fromUserClasspath( + @Nullable String jobClassName, Iterable userClasspath) throws FlinkException { + try { + if (jobClassName != null) { + return FromClasspathEntryClassInformationProvider.create( + jobClassName, userClasspath); + } + + return FromClasspathEntryClassInformationProvider.createFromClasspath(userClasspath); + } catch (IOException e) { + throw createGenericFlinkException(e); + } + } + + private static FlinkException createGenericFlinkException(Throwable t) { + return new FlinkException("An error occurred while access the provided classpath.", t); + } + + private DefaultPackagedProgramRetriever( + EntryClassInformationProvider entryClassInformationProvider, + String[] programArguments, + List userClasspath) { + this.entryClassInformationProvider = + Preconditions.checkNotNull( + entryClassInformationProvider, "No EntryClassInformationProvider passed."); + this.programArguments = + Preconditions.checkNotNull(programArguments, "No program parameter array passed."); + this.userClasspath = Preconditions.checkNotNull(userClasspath, "No user classpath passed."); + } + + @Override + public PackagedProgram getPackagedProgram() throws FlinkException { + try { + final PackagedProgram.Builder packagedProgramBuilder = + PackagedProgram.newBuilder() + .setUserClassPaths(userClasspath) + .setArguments(programArguments); + + entryClassInformationProvider + .getJobClassName() + .ifPresent(packagedProgramBuilder::setEntryPointClassName); + entryClassInformationProvider + .getJarFile() + .ifPresent(packagedProgramBuilder::setJarFile); + + return packagedProgramBuilder.build(); + } catch (ProgramInvocationException e) { + throw new FlinkException("Could not load the provided entrypoint class.", e); + } + } + + private static List extractUserClasspath(@Nullable File userLibDir) throws IOException { + if (userLibDir == null) { + return Collections.emptyList(); + } + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + final List relativeJarURLs = + FileUtils.listFilesInDirectory(userLibDir.toPath(), FileUtils::isJarFile).stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .collect(Collectors.toList()); + return Collections.unmodifiableList(relativeJarURLs); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java deleted file mode 100644 index 291504f448659..0000000000000 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.deployment.application; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.PackagedProgramUtils; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.client.testjar.TestJob; -import org.apache.flink.client.testjar.TestJobInfo; -import org.apache.flink.configuration.ConfigUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.configuration.PipelineOptionsInternal; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.function.FunctionUtils; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.stream.Collectors; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasProperty; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** Tests for the {@link ClassPathPackagedProgramRetriever}. */ -public class ClassPathPackagedProgramRetrieverTest extends TestLogger { - - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @ClassRule public static final TemporaryFolder JOB_DIRS = new TemporaryFolder(); - - private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; - - /* - * The directory structure used to test - * - * userDirHasEntryClass/ - * |_jarWithEntryClass - * |_jarWithoutEntryClass - * |_textFile - * - * userDirHasNotEntryClass/ - * |_jarWithoutEntryClass - * |_textFile - */ - - private static final Collection expectedURLs = new ArrayList<>(); - - private static File userDirHasEntryClass; - - private static File userDirHasNotEntryClass; - - @BeforeClass - public static void init() throws IOException { - final String textFileName = "test.txt"; - final String userDirHasEntryClassName = "_test_user_dir_has_entry_class"; - final String userDirHasNotEntryClassName = "_test_user_dir_has_not_entry_class"; - - userDirHasEntryClass = JOB_DIRS.newFolder(userDirHasEntryClassName); - final Path userJarPath = - userDirHasEntryClass.toPath().resolve(TestJobInfo.JOB_JAR_PATH.toFile().getName()); - final Path userLibJarPath = - userDirHasEntryClass - .toPath() - .resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName()); - userDirHasNotEntryClass = JOB_DIRS.newFolder(userDirHasNotEntryClassName); - - // create files - Files.copy(TestJobInfo.JOB_JAR_PATH, userJarPath); - Files.copy(TestJobInfo.JOB_LIB_JAR_PATH, userLibJarPath); - Files.createFile(userDirHasEntryClass.toPath().resolve(textFileName)); - - Files.copy( - TestJobInfo.JOB_LIB_JAR_PATH, - userDirHasNotEntryClass - .toPath() - .resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName())); - Files.createFile(userDirHasNotEntryClass.toPath().resolve(textFileName)); - - final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); - Arrays.asList(userJarPath, userLibJarPath).stream() - .map(path -> FileUtils.relativizePath(workingDirectory, path)) - .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) - .forEach(expectedURLs::add); - } - - @Test - public void testJobGraphRetrieval() - throws IOException, FlinkException, ProgramInvocationException { - final int parallelism = 42; - final JobID jobId = new JobID(); - - final Configuration configuration = new Configuration(); - configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); - configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); - - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJobClassName(TestJob.class.getCanonicalName()) - .build(); - - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, configuration); - - assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); - assertThat( - jobGraph.getSavepointRestoreSettings(), - is(equalTo(SavepointRestoreSettings.none()))); - assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); - assertEquals(jobGraph.getJobID(), jobId); - } - - @Test - public void testJobGraphRetrievalFromJar() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarsOnClassPath(() -> Collections.singleton(testJar)) - .build(); - - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); - } - - @Test - public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = new File("non-existing"); - - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - // Both a class name is specified and a JAR "is" on the class path - // The class name should have precedence. - .setJobClassName(TestJob.class.getCanonicalName()) - .setJarsOnClassPath(() -> Collections.singleton(testJar)) - .build(); - - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); - } - - @Test - public void testSavepointRestoreSettings() - throws FlinkException, IOException, ProgramInvocationException { - final Configuration configuration = new Configuration(); - final SavepointRestoreSettings savepointRestoreSettings = - SavepointRestoreSettings.forPath("foobar", true); - final JobID jobId = new JobID(); - - configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); - SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration); - - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJobClassName(TestJob.class.getCanonicalName()) - .build(); - - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, configuration); - - assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings))); - assertEquals(jobGraph.getJobID(), jobId); - } - - @Test - public void testJarFromClassPathSupplierSanityCheck() { - Iterable jarFiles = ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get(); - - // Junit executes this test, so it should be returned as part of JARs on the class path - assertThat(jarFiles, hasItem(hasProperty("name", containsString("junit")))); - } - - @Test - public void testJarFromClassPathSupplier() throws IOException { - final File file1 = temporaryFolder.newFile(); - final File file2 = temporaryFolder.newFile(); - final File directory = temporaryFolder.newFolder(); - - // Mock java.class.path property. The empty strings are important as the shell scripts - // that prepare the Flink class path often have such entries. - final String classPath = - javaClassPath( - "", - "", - "", - file1.getAbsolutePath(), - "", - directory.getAbsolutePath(), - "", - file2.getAbsolutePath(), - "", - ""); - - Iterable jarFiles = setClassPathAndGetJarsOnClassPath(classPath); - - assertThat(jarFiles, contains(file1, file2)); - } - - @Test - public void testJobGraphRetrievalFailIfJobDirDoesNotHaveEntryClass() - throws IOException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarsOnClassPath(() -> Collections.singleton(testJar)) - .setUserLibDirectory(userDirHasNotEntryClass) - .build(); - try { - retrieveJobGraph(retrieverUnderTest, new Configuration()); - Assert.fail("This case should throw exception !"); - } catch (FlinkException e) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - e, "Failed to find job JAR on class path") - .isPresent()); - } - } - - @Test - public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir() - throws IOException, ProgramInvocationException { - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJobClassName(TestJobInfo.JOB_CLASS) - .setJarsOnClassPath(Collections::emptyList) - .setUserLibDirectory(userDirHasNotEntryClass) - .build(); - try { - retrieveJobGraph(retrieverUnderTest, new Configuration()); - Assert.fail("This case should throw class not found exception!!"); - } catch (FlinkException e) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - e, "Could not find the provided job class") - .isPresent()); - } - } - - @Test - public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() - throws IOException, FlinkException, ProgramInvocationException { - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarsOnClassPath(Collections::emptyList) - .setUserLibDirectory(userDirHasEntryClass) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat( - jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); - } - - @Test - public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() - throws IOException, FlinkException, ProgramInvocationException { - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJobClassName(TestJobInfo.JOB_CLASS) - .setJarsOnClassPath(Collections::emptyList) - .setUserLibDirectory(userDirHasEntryClass) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat( - jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); - } - - @Test - public void testRetrieveFromJarFileWithoutUserLib() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarFile(testJar) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat( - jobGraph.getUserJars(), - containsInAnyOrder(new org.apache.flink.core.fs.Path(testJar.toURI()))); - assertThat(jobGraph.getClasspaths().isEmpty(), is(true)); - } - - @Test - public void testRetrieveFromJarFileWithUserLib() - throws IOException, FlinkException, ProgramInvocationException { - final File testJar = TestJob.getTestJobJar(); - final ClassPathPackagedProgramRetriever retrieverUnderTest = - ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS) - .setJarFile(testJar) - .setUserLibDirectory(userDirHasEntryClass) - .build(); - final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); - - assertThat( - jobGraph.getUserJars(), - containsInAnyOrder(new org.apache.flink.core.fs.Path(testJar.toURI()))); - assertThat( - jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()), - containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray())); - } - - private JobGraph retrieveJobGraph( - ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) - throws FlinkException, ProgramInvocationException, MalformedURLException { - final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram(); - - final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); - ConfigUtils.encodeCollectionToConfig( - configuration, - PipelineOptions.JARS, - packagedProgram.getJobJarAndDependencies(), - URL::toString); - ConfigUtils.encodeCollectionToConfig( - configuration, - PipelineOptions.CLASSPATHS, - packagedProgram.getClasspaths(), - URL::toString); - - final Pipeline pipeline = - PackagedProgramUtils.getPipelineFromProgram( - packagedProgram, configuration, defaultParallelism, false); - return PipelineExecutorUtils.getJobGraph(pipeline, configuration); - } - - private static String javaClassPath(String... entries) { - String pathSeparator = - System.getProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.PATH_SEPARATOR); - return String.join(pathSeparator, entries); - } - - private static Iterable setClassPathAndGetJarsOnClassPath(String classPath) { - final String originalClassPath = - System.getProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH); - try { - System.setProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, classPath); - return ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get(); - } finally { - // Reset property - System.setProperty( - ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, - originalClassPath); - } - } -} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProviderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProviderTest.java new file mode 100644 index 0000000000000..872427754b632 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProviderTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.client.testjar.ClasspathProvider; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.io.FilenameUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.hamcrest.core.IsCollectionContaining; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * {@code FromClasspathEntryClassInformationProviderTest} tests {@link + * FromClasspathEntryClassInformationProvider}. + */ +public class FromClasspathEntryClassInformationProviderTest extends TestLogger { + + @Rule + public ClasspathProvider noEntryClassClasspathProvider = + ClasspathProvider.createWithNoEntryClass(); + + @Rule + public ClasspathProvider singleEntryClassClasspathProvider = + ClasspathProvider.createWithSingleEntryClass(); + + @Rule + public ClasspathProvider multipleEntryClassesClasspathProvider = + ClasspathProvider.createWithMultipleEntryClasses(); + + @Rule + public ClasspathProvider testJobEntryClassClasspathProvider = + ClasspathProvider.createWithTestJobOnly(); + + @Rule + public ClasspathProvider onlyTextFileClasspathProvider = + ClasspathProvider.createWithTextFileOnly(); + + @Test + public void testJobClassOnUserClasspathWithExplicitJobClassName() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider testInstance = + FromClasspathEntryClassInformationProvider.create( + singleEntryClassClasspathProvider.getJobClassName(), + singleEntryClassClasspathProvider.getURLUserClasspath()); + + assertThat(testInstance.getJobClassName().isPresent(), is(true)); + assertThat( + testInstance.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(testInstance.getJarFile().isPresent(), is(false)); + } + + @Test(expected = FlinkException.class) + public void testJobClassOnUserClasspathWithOnlyTestFileOnClasspath() + throws IOException, FlinkException { + // we want to check that the right exception is thrown if the user classpath is empty + FromClasspathEntryClassInformationProvider.create( + "SomeJobClassName", onlyTextFileClasspathProvider.getURLUserClasspath()); + } + + @Test(expected = NullPointerException.class) + public void testJobClassOnUserClasspathWithMissingJobClassName() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider.create( + null, singleEntryClassClasspathProvider.getURLUserClasspath()); + } + + @Test(expected = NullPointerException.class) + public void testJobClassOnUserClasspathWithMissingUserClasspath() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider.create("jobClassName", null); + } + + @Test + public void testJobClassOnUserClasspathWithoutExplicitJobClassName() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider testInstance = + FromClasspathEntryClassInformationProvider.createFromClasspath( + singleEntryClassClasspathProvider.getURLUserClasspath()); + + assertThat(testInstance.getJobClassName().isPresent(), is(true)); + assertThat( + testInstance.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(testInstance.getJarFile().isPresent(), is(false)); + } + + @Test(expected = FlinkException.class) + public void testMissingJobClassOnUserClasspathWithoutExplicitJobClassName() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider.createFromClasspath( + noEntryClassClasspathProvider.getURLUserClasspath()); + } + + @Test(expected = FlinkException.class) + public void testTooManyMainMethodsOnUserClasspath() throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider.createFromClasspath( + multipleEntryClassesClasspathProvider.getURLUserClasspath()); + } + + @Test(expected = NullPointerException.class) + public void testJobClassOnUserClasspathWithoutExplicitJobClassNameAndMissingUserClasspath() + throws IOException, FlinkException { + FromClasspathEntryClassInformationProvider.createFromClasspath(null); + } + + @Test + public void testJobClassNameFromSystemClasspath() throws IOException, FlinkException { + singleEntryClassClasspathProvider.setSystemClasspath(); + FromClasspathEntryClassInformationProvider testInstance = + FromClasspathEntryClassInformationProvider.createFromSystemClasspath(); + assertThat(testInstance.getJobClassName().isPresent(), is(true)); + assertThat( + testInstance.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(testInstance.getJarFile().isPresent(), is(false)); + } + + @Test(expected = FlinkException.class) + public void testMissingJobClassNameFromSystemClasspath() throws IOException, FlinkException { + noEntryClassClasspathProvider.setSystemClasspath(); + FromClasspathEntryClassInformationProvider.createFromSystemClasspath(); + } + + @Test(expected = FlinkException.class) + public void testTooManyMainMethodsOnSystemClasspath() throws IOException, FlinkException { + multipleEntryClassesClasspathProvider.setSystemClasspath(); + FromClasspathEntryClassInformationProvider.createFromSystemClasspath(); + } + + @Test + public void testJarFromSystemClasspathSanityCheck() { + // Junit executes this test, so it should be returned as part of JARs on the classpath + final Iterable systemClasspath = + FromClasspathEntryClassInformationProvider.extractSystemClasspath(); + assertThat( + StreamSupport.stream(systemClasspath.spliterator(), false) + .map(File::getName) + .collect(Collectors.toList()), + IsCollectionContaining.hasItem(CoreMatchers.containsString("junit"))); + } + + @Test + public void testJarFromSystemClasspath() throws MalformedURLException { + multipleEntryClassesClasspathProvider.setSystemClasspath(); + final Collection systemClasspath = + StreamSupport.stream( + FromClasspathEntryClassInformationProvider.extractSystemClasspath() + .spliterator(), + false) + .map(File::getName) + .collect(Collectors.toList()); + final Collection expectedContent = + StreamSupport.stream( + multipleEntryClassesClasspathProvider + .getURLUserClasspath() + .spliterator(), + false) + .map(URL::getPath) + .map(FilenameUtils::getName) + // we're excluding any non-jar files + .filter(name -> name.endsWith("jar")) + .collect(Collectors.toList()); + assertThat( + systemClasspath, + IsIterableContainingInAnyOrder.containsInAnyOrder(expectedContent.toArray())); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProviderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProviderTest.java new file mode 100644 index 0000000000000..d07f3f4339f0d --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProviderTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.File; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * {@code FromJarEntryClassInformationProviderTest} tests {@link + * FromJarEntryClassInformationProvider}. + */ +public class FromJarEntryClassInformationProviderTest extends TestLogger { + + @Test + public void testCustomJarFile() { + final File jarFile = new File("some/path/to/jar"); + final String jobClassName = "JobClassName"; + final FromJarEntryClassInformationProvider testInstance = + FromJarEntryClassInformationProvider.createFromCustomJar(jarFile, jobClassName); + + assertThat(testInstance.getJarFile().isPresent(), is(true)); + assertThat(testInstance.getJarFile().get(), is(jarFile)); + assertThat(testInstance.getJobClassName().isPresent(), is(true)); + assertThat(testInstance.getJobClassName().get(), is(jobClassName)); + } + + @Test(expected = NullPointerException.class) + public void testMissingJar() { + final EntryClassInformationProvider testInstance = + FromJarEntryClassInformationProvider.createFromCustomJar(null, "JobClassName"); + } + + @Test + public void testMissingJobClassName() { + final File jarFile = new File("some/path/to/jar"); + final EntryClassInformationProvider testInstance = + FromJarEntryClassInformationProvider.createFromCustomJar(jarFile, null); + assertThat(testInstance.getJarFile().isPresent(), is(true)); + assertThat(testInstance.getJarFile().get(), is(jarFile)); + assertThat(testInstance.getJobClassName().isPresent(), is(false)); + } + + @Test(expected = NullPointerException.class) + public void testEitherJobClassNameOrJarHasToBeSet() { + FromJarEntryClassInformationProvider.createFromCustomJar(null, null); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java index 17f4bd17d91b7..e9a1e0b1c92db 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java @@ -20,6 +20,7 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.testjar.TestJob; +import org.apache.flink.client.testjar.TestJobInfo; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; @@ -157,6 +158,22 @@ public void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() throws I is(equalTo(TestJob.class.getCanonicalName()))); } + @Test + public void testFindFirstManifestAttributeWithNoAttribute() throws IOException { + assertThat( + JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar()).isPresent(), + is(false)); + } + + @Test + public void testFindFirstManifestAttributeWithAttributes() throws IOException { + Optional optionalValue = + JarManifestParser.findFirstManifestAttribute( + TestJob.getTestJobJar(), PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS); + assertThat(optionalValue.isPresent(), is(true)); + assertThat(optionalValue.get(), is(TestJobInfo.TEST_JAR_JOB_CLASS)); + } + private File createJarFileWithManifest(Map manifest) throws IOException { final File jarFile = temporaryFolder.newFile(); try (ZipOutputStream zos = diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java new file mode 100644 index 0000000000000..8f1f2fd48d0f3 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.deployment.application.EntryClassInformationProvider; +import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; +import org.apache.flink.client.testjar.ClasspathProvider; +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.fail; + +/** {@code PackagedProgramRetrieverImplTest} tests {@link DefaultPackagedProgramRetriever}. */ +public class DefaultPackagedProgramRetrieverTest extends TestLogger { + + @Rule + public ClasspathProvider noEntryClassClasspathProvider = + ClasspathProvider.createWithNoEntryClass(); + + @Rule + public ClasspathProvider singleEntryClassClasspathProvider = + ClasspathProvider.createWithSingleEntryClass(); + + @Rule + public ClasspathProvider multipleEntryClassesClasspathProvider = + ClasspathProvider.createWithMultipleEntryClasses(); + + @Rule + public ClasspathProvider testJobEntryClassClasspathProvider = + ClasspathProvider.createWithTestJobOnly(); + + @Test + public void testDeriveEntryClassInformationForCustomJar() + throws FlinkException, MalformedURLException { + // clearing the system classpath to make sure that no data is collected from there + noEntryClassClasspathProvider.setSystemClasspath(); + + final String jobClassName = "SomeJobClassName"; + final File jarFile = new File("some/jar/file.jar"); + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + null, jarFile, jobClassName, new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat(informationProvider.getJobClassName().get(), is(jobClassName)); + assertThat(informationProvider.getJarFile().isPresent(), is(true)); + assertThat(informationProvider.getJarFile().get(), is(jarFile)); + } + + @Test + public void testDeriveEntryClassInformationFromSystemClasspathWithNonExistingJobClassName() + throws IOException, FlinkException { + // this test succeeds even though we could make the code fail early of we start validating + // the existing of the passed Java class on the system classpath analogously to what is done + // for the user classpath + singleEntryClassClasspathProvider.setSystemClasspath(); + + final String jobClassName = "SomeJobClassNotBeingOnTheSystemClasspath"; + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + null, null, jobClassName, new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat(informationProvider.getJobClassName().get(), is(jobClassName)); + assertThat(informationProvider.getJarFile().isPresent(), is(false)); + } + + @Test + public void testDeriveEntryClassInformationFromSystemClasspathWithExistingJobClassName() + throws IOException, FlinkException { + singleEntryClassClasspathProvider.setSystemClasspath(); + + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + null, + null, + singleEntryClassClasspathProvider.getJobClassName(), + new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat( + informationProvider.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(informationProvider.getJarFile().isPresent(), is(false)); + } + + @Test + public void testDeriveEntryClassInformationFromSystemClasspathExtractingTheJobClassFromThere() + throws IOException, FlinkException { + singleEntryClassClasspathProvider.setSystemClasspath(); + + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + null, null, null, new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat( + informationProvider.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(informationProvider.getJarFile().isPresent(), is(false)); + } + + @Test + public void testDeriveEntryClassInformationFromClasspathWithJobClass() + throws IOException, FlinkException { + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + multipleEntryClassesClasspathProvider.getURLUserClasspath(), + null, + // we have to specify the job class - otherwise the call would fail due to + // two main method being present + multipleEntryClassesClasspathProvider.getJobClassName(), + new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat( + informationProvider.getJobClassName().get(), + is(multipleEntryClassesClasspathProvider.getJobClassName())); + assertThat(informationProvider.getJarFile().isPresent(), is(false)); + } + + @Test + public void testDeriveEntryClassInformationFromClasspathWithNoJobClass() + throws IOException, FlinkException { + final EntryClassInformationProvider informationProvider = + DefaultPackagedProgramRetriever.createEntryClassInformationProvider( + singleEntryClassClasspathProvider.getURLUserClasspath(), + null, + // no job class name is specified which enables looking for the entry class + // on the user classpath + null, + new String[0]); + assertThat(informationProvider.getJobClassName().isPresent(), is(true)); + assertThat( + informationProvider.getJobClassName().get(), + is(singleEntryClassClasspathProvider.getJobClassName())); + assertThat(informationProvider.getJarFile().isPresent(), is(false)); + } + + @Test + public void testCreateWithUserLibDir() throws FlinkException { + final PackagedProgramRetriever retriever = + DefaultPackagedProgramRetriever.create( + singleEntryClassClasspathProvider.getDirectory(), + null, + singleEntryClassClasspathProvider.getJobClassName(), + new String[0]); + + // the right information is picked up without any error + assertThat( + retriever.getPackagedProgram().getMainClassName(), + is(singleEntryClassClasspathProvider.getJobClassName())); + } + + @Test + public void testJobGraphRetrieval() + throws IOException, FlinkException, ProgramInvocationException { + final int parallelism = 42; + final JobID jobId = new JobID(); + + final Configuration configuration = new Configuration(); + configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + + final String expectedSuffix = "suffix"; + final PackagedProgramRetriever retriever = + DefaultPackagedProgramRetriever.create( + null, + testJobEntryClassClasspathProvider.getJobClassName(), + ClasspathProvider.parametersForTestJob(expectedSuffix)); + + final JobGraph jobGraph = retrieveJobGraph(retriever, configuration); + + assertThat( + jobGraph.getName(), + is(testJobEntryClassClasspathProvider.getJobClassName() + "-" + expectedSuffix)); + assertThat(jobGraph.getSavepointRestoreSettings(), is(SavepointRestoreSettings.none())); + assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); + assertThat(jobGraph.getJobID(), is(jobId)); + } + + @Test + public void testJobGraphRetrievalFromJar() + throws IOException, FlinkException, ProgramInvocationException { + final String expectedSuffix = "suffix"; + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + testJobEntryClassClasspathProvider.getDirectory(), + null, + null, + ClasspathProvider.parametersForTestJob(expectedSuffix)); + + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat( + jobGraph.getName(), + is(testJobEntryClassClasspathProvider.getJobClassName() + "-" + expectedSuffix)); + } + + @Test + public void testParameterConsiderationForMultipleJobsOnSystemClasspath() + throws IOException, FlinkException, ProgramInvocationException { + final String expectedSuffix = "suffix"; + final PackagedProgramRetriever retrieverUnderTest = + // Both a class name is specified and a JAR "is" on the class path + // The class name should have precedence. + DefaultPackagedProgramRetriever.create( + null, + testJobEntryClassClasspathProvider.getJobClassName(), + ClasspathProvider.parametersForTestJob(expectedSuffix)); + + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat( + jobGraph.getName(), + is(testJobEntryClassClasspathProvider.getJobClassName() + "-suffix")); + } + + @Test + public void testSavepointRestoreSettings() + throws FlinkException, IOException, ProgramInvocationException { + final Configuration configuration = new Configuration(); + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.forPath("foobar", true); + final JobID jobId = new JobID(); + + configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration); + + final String expectedSuffix = "suffix"; + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + null, + testJobEntryClassClasspathProvider.getJobClassName(), + ClasspathProvider.parametersForTestJob(expectedSuffix)); + + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, configuration); + + assertThat(jobGraph.getSavepointRestoreSettings(), is(savepointRestoreSettings)); + assertThat(jobGraph.getJobID(), is(jobId)); + } + + @Test + public void testFailIfJobDirDoesNotHaveEntryClass() { + try { + DefaultPackagedProgramRetriever.create( + noEntryClassClasspathProvider.getDirectory(), + testJobEntryClassClasspathProvider.getJobClassName(), + ClasspathProvider.parametersForTestJob("suffix")); + fail("This case should throw exception !"); + } catch (FlinkException e) { + assertThat( + e, + FlinkMatchers.containsMessage( + String.format( + "Could not find the provided job class (%s) in the user lib directory.", + testJobEntryClassClasspathProvider.getJobClassName()))); + } + } + + @Test(expected = FlinkException.class) + public void testEntryClassNotFoundOnSystemClasspath() throws FlinkException { + final PackagedProgramRetriever testInstance = + DefaultPackagedProgramRetriever.create( + null, "NotExistingClass", new String[0]); + // the getPackagedProgram fails do to the missing class. We could make it fail earlier by + // validating the existence of the passed Java class on the system classpath (analogously to + // what we already do for the user classpath) + // see testDeriveEntryClassInformationFromSystemClasspathWithNonExistingJobClassName + testInstance.getPackagedProgram(); + } + + @Test(expected = FlinkException.class) + public void testEntryClassNotFoundOnUserClasspath() throws FlinkException { + DefaultPackagedProgramRetriever.create( + noEntryClassClasspathProvider.getDirectory(), + "NotExistingClass", + new String[0]); + } + + @Test(expected = FlinkException.class) + public void testWithoutJobClassAndMultipleEntryClassesOnUserClasspath() throws FlinkException { + // without a job class name specified deriving the entry class from classpath is impossible + // if the classpath contains multiple classes with main methods + DefaultPackagedProgramRetriever.create( + multipleEntryClassesClasspathProvider.getDirectory(), + null, + new String[0]); + } + + @Test(expected = FlinkException.class) + public void testWithoutJobClassAndMultipleEntryClassesOnSystemClasspath() + throws FlinkException { + DefaultPackagedProgramRetriever.create(null, null, new String[0]); + } + + @Test + public void testWithJobClassAndMultipleEntryClassesOnUserClasspath() throws FlinkException { + final DefaultPackagedProgramRetriever retriever = + DefaultPackagedProgramRetriever.create( + multipleEntryClassesClasspathProvider.getDirectory(), + multipleEntryClassesClasspathProvider.getJobClassName(), + new String[0]); + assertThat( + retriever.getPackagedProgram().getMainClassName(), + is(multipleEntryClassesClasspathProvider.getJobClassName())); + } + + @Test + public void testWithJobClassAndMultipleEntryClassesOnSystemClasspath() + throws FlinkException, MalformedURLException { + multipleEntryClassesClasspathProvider.setSystemClasspath(); + + final DefaultPackagedProgramRetriever retriever = + DefaultPackagedProgramRetriever.create( + null, + multipleEntryClassesClasspathProvider.getJobClassName(), + new String[0]); + assertThat( + retriever.getPackagedProgram().getMainClassName(), + is(multipleEntryClassesClasspathProvider.getJobClassName())); + } + + @Test + public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + singleEntryClassClasspathProvider.getDirectory(), + null, + ClasspathProvider.parametersForTestJob("suffix")); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + final List actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + + final List expectedClasspath = + extractRelativizedURLsForJarsFromDirectory( + singleEntryClassClasspathProvider.getDirectory()); + + assertThat( + actualClasspath, + IsIterableContainingInAnyOrder.containsInAnyOrder(expectedClasspath.toArray())); + } + + @Test + public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + singleEntryClassClasspathProvider.getDirectory(), + singleEntryClassClasspathProvider.getJobClassName(), + ClasspathProvider.parametersForTestJob("suffix")); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + final List actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + + final List expectedClasspath = + extractRelativizedURLsForJarsFromDirectory( + singleEntryClassClasspathProvider.getDirectory()); + + assertThat( + actualClasspath, + IsIterableContainingInAnyOrder.containsInAnyOrder(expectedClasspath.toArray())); + } + + @Test + public void testRetrieveFromJarFileWithoutUserLib() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + null, + testJobEntryClassClasspathProvider.getJobJar(), + null, + ClasspathProvider.parametersForTestJob("suffix")); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat( + jobGraph.getUserJars(), + containsInAnyOrder( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI()))); + assertThat(jobGraph.getClasspaths().isEmpty(), is(true)); + } + + @Test + public void testRetrieveFromJarFileWithUserLib() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + singleEntryClassClasspathProvider.getDirectory(), + // the testJob jar is not on the user classpath + testJobEntryClassClasspathProvider.getJobJar(), + null, + ClasspathProvider.parametersForTestJob("suffix")); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat( + jobGraph.getUserJars(), + containsInAnyOrder( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI()))); + final List actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List expectedClasspath = + extractRelativizedURLsForJarsFromDirectory( + singleEntryClassClasspathProvider.getDirectory()); + + assertThat( + actualClasspath, + IsIterableContainingInAnyOrder.containsInAnyOrder(expectedClasspath.toArray())); + } + + private JobGraph retrieveJobGraph( + PackagedProgramRetriever retrieverUnderTest, Configuration configuration) + throws FlinkException, ProgramInvocationException, MalformedURLException { + final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram(); + + final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); + ConfigUtils.encodeCollectionToConfig( + configuration, + PipelineOptions.JARS, + packagedProgram.getJobJarAndDependencies(), + URL::toString); + ConfigUtils.encodeCollectionToConfig( + configuration, + PipelineOptions.CLASSPATHS, + packagedProgram.getClasspaths(), + URL::toString); + + final Pipeline pipeline = + PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, configuration, defaultParallelism, false); + return PipelineExecutorUtils.getJobGraph(pipeline, configuration); + } + + private static List extractRelativizedURLsForJarsFromDirectory(File directory) + throws MalformedURLException { + Preconditions.checkArgument( + directory.listFiles() != null, + "The passed File does not seem to be a directory or is not acessible: " + + directory.getAbsolutePath()); + + final List relativizedURLs = new ArrayList<>(); + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + for (File file : Preconditions.checkNotNull(directory.listFiles())) { + if (!FileUtils.isJarFile(file.toPath())) { + // any non-JARs are filtered by PackagedProgramRetrieverImpl + continue; + } + + Path relativePath = FileUtils.relativizePath(workingDirectory, file.toPath()); + relativizedURLs.add(FileUtils.toURL(relativePath).toString()); + } + + return relativizedURLs; + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProvider.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProvider.java new file mode 100644 index 0000000000000..4d37a74f2f903 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProvider.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.testjar; + +import org.apache.flink.client.deployment.application.JarManifestParser; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * {@code ClasspathProvider} offers utility methods for creating a classpath based on actual jars. + */ +public class ClasspathProvider extends ExternalResource { + + private static final String CLASSPATH_PROPERTY_NAME = "java.class.path"; + + private static final Path TEST_JOB_JAR_PATH = Paths.get("target", "maven-test-jar.jar"); + + private static final Path JOB_JAR_PATH = + Paths.get("target", "maven-test-user-classloader-job-jar.jar"); + private static final Path JOB_LIB_JAR_PATH = + Paths.get("target", "maven-test-user-classloader-job-lib-jar.jar"); + + private final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private final String directoryNameSuffix; + private final ThrowingConsumer directoryContentCreator; + @Nullable private final String jobClassName; + @Nullable private final File jarFile; + + private File directory; + + private final String originalSystemClasspath; + + public static ClasspathProvider createWithNoEntryClass() { + return new ClasspathProvider( + "_user_dir_with_no_entry_class", + directory -> { + copyJar(JOB_LIB_JAR_PATH, directory); + createTestFile(directory); + }); + } + + public static ClasspathProvider createWithSingleEntryClass() { + return new ClasspathProvider( + "_user_dir_with_single_entry_class", + directory -> { + copyJar(JOB_LIB_JAR_PATH, directory); + copyJar(JOB_JAR_PATH, directory); + createTestFile(directory); + }, + JOB_JAR_PATH.toFile()); + } + + public static ClasspathProvider createWithMultipleEntryClasses() { + return new ClasspathProvider( + "_user_dir_with_multiple_entry_classes", + directory -> { + copyJar(JOB_LIB_JAR_PATH, directory); + // first jar with main method + copyJar(JOB_JAR_PATH, directory); + // second jar with main method + copyJar(TEST_JOB_JAR_PATH, directory); + createTestFile(directory); + }, + TEST_JOB_JAR_PATH.toFile()); + } + + public static ClasspathProvider createWithTestJobOnly() { + return new ClasspathProvider( + "_user_dir_with_testjob_entry_class_only", + directory -> copyJar(TEST_JOB_JAR_PATH, directory), + TEST_JOB_JAR_PATH.toFile()); + } + + public static String[] parametersForTestJob(String strValue) { + return new String[] {"--arg", strValue}; + } + + public static ClasspathProvider createWithTextFileOnly() { + return new ClasspathProvider( + "_user_dir_with_text_file_only", ClasspathProvider::createTestFile); + } + + private static void copyJar(Path sourcePath, File targetDir) throws IOException { + Files.copy(sourcePath, targetDir.toPath().resolve(sourcePath.toFile().getName())); + } + + private static void createTestFile(File targetDir) throws IOException { + Files.createFile(targetDir.toPath().resolve("test.txt")); + } + + private ClasspathProvider( + String directoryNameSuffix, + ThrowingConsumer directoryContentCreator) { + this(directoryNameSuffix, directoryContentCreator, null, null); + } + + private ClasspathProvider( + String directoryNameSuffix, + ThrowingConsumer directoryContentCreator, + File jarFile) { + this( + directoryNameSuffix, + directoryContentCreator, + jarFile, + extractEntryClassNameFromJar(jarFile)); + } + + private ClasspathProvider( + String directoryNameSuffix, + ThrowingConsumer directoryContentCreator, + @Nullable File jarFile, + @Nullable String jobClassName) { + this.directoryNameSuffix = + Preconditions.checkNotNull(directoryNameSuffix, "No directory specified."); + this.directoryContentCreator = + Preconditions.checkNotNull( + directoryContentCreator, "No logic for filling the directory specified."); + this.jarFile = jarFile; + this.jobClassName = jobClassName; + + this.originalSystemClasspath = System.getProperty(CLASSPATH_PROPERTY_NAME); + } + + @Override + public void before() throws IOException { + temporaryFolder.create(); + + directory = temporaryFolder.newFolder(directoryNameSuffix); + directoryContentCreator.accept(directory); + } + + @Override + protected void after() { + temporaryFolder.delete(); + resetSystemClasspath(); + } + + @Nullable + public String getJobClassName() { + return jobClassName; + } + + public File getJobJar() { + if (jarFile == null) { + throw new UnsupportedOperationException( + "There's no job jar specified for " + directory.getName()); + } + + return jarFile; + } + + private static String extractEntryClassNameFromJar(File f) { + try { + return JarManifestParser.findFirstManifestAttribute( + f, PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS) + .orElseThrow( + () -> + new IllegalArgumentException( + "The passed file does not contain a main class: " + + f.getAbsolutePath())); + } catch (Throwable t) { + throw new AssertionError( + "Something went wrong with retrieving the main class from " + + f.getAbsolutePath(), + t); + } + } + + public File getDirectory() { + return directory; + } + + public Iterable getURLUserClasspath() throws MalformedURLException { + List list = new ArrayList<>(); + for (File file : getFileUserClasspath(getDirectory())) { + list.add(file.toURI().toURL()); + } + return list; + } + + public void setSystemClasspath() throws MalformedURLException { + final String classpathStr = generateClasspathString(getURLUserClasspath()); + System.setProperty(CLASSPATH_PROPERTY_NAME, classpathStr); + } + + public void resetSystemClasspath() { + System.setProperty(CLASSPATH_PROPERTY_NAME, originalSystemClasspath); + } + + private static String generateClasspathString(Iterable classpath) { + final String pathSeparator = System.getProperty("path.separator"); + return StreamSupport.stream(classpath.spliterator(), false) + .map(URL::toString) + .collect(Collectors.joining(pathSeparator)); + } + + private static List getFileUserClasspath(File parentFolder) { + return Arrays.asList(Objects.requireNonNull(parentFolder.listFiles())); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJob.java index 481ee262ad753..5c0f8f8fda4b5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJob.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJob.java @@ -19,7 +19,6 @@ package org.apache.flink.client.testjar; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetrieverTest; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -28,7 +27,7 @@ import java.io.File; import java.io.FileNotFoundException; -/** Test job which is used for {@link ClassPathPackagedProgramRetrieverTest}. */ +/** Test job which is used for {@link PackagedProgramRetrieverImplTest}. */ public class TestJob { public static void main(String[] args) throws Exception { diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJobInfo.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJobInfo.java index 2f5c0e5148205..744bb342a8b8b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJobInfo.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/TestJobInfo.java @@ -24,9 +24,11 @@ /** The test job information. */ public class TestJobInfo { + public static final String TEST_JAR_JOB_CLASS = "org.apache.flink.client.testjar.TestJob"; public static final String JOB_CLASS = "org.apache.flink.client.testjar.TestUserClassLoaderJob"; public static final String JOB_LIB_CLASS = "org.apache.flink.client.testjar.TestUserClassLoaderJobLib"; + public static final Path TEST_JOB_JAR_PATH = Paths.get("target", "maven-test-jar.jar"); public static final Path JOB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-jar.jar"); public static final Path JOB_LIB_JAR_PATH = diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java index 48697ea282cf4..b1bb21147a429 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; -import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; import org.apache.flink.configuration.Configuration; @@ -36,10 +36,7 @@ import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.FlinkException; -import javax.annotation.Nullable; - import java.io.File; -import java.io.IOException; /** An {@link ApplicationClusterEntryPoint} which is started with a job in a predefined location. */ @Internal @@ -102,24 +99,16 @@ static Configuration loadConfigurationFromClusterConfig( private static PackagedProgram getPackagedProgram( final StandaloneApplicationClusterConfiguration clusterConfiguration) - throws IOException, FlinkException { + throws FlinkException { + final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null); final PackagedProgramRetriever programRetriever = - getPackagedProgramRetriever( - clusterConfiguration.getArgs(), clusterConfiguration.getJobClassName()); + DefaultPackagedProgramRetriever.create( + userLibDir, + clusterConfiguration.getJobClassName(), + clusterConfiguration.getArgs()); return programRetriever.getPackagedProgram(); } - private static PackagedProgramRetriever getPackagedProgramRetriever( - final String[] programArguments, @Nullable final String jobClassName) - throws IOException { - final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null); - final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = - ClassPathPackagedProgramRetriever.newBuilder(programArguments) - .setUserLibDirectory(userLibDir) - .setJobClassName(jobClassName); - return retrieverBuilder.build(); - } - private static void setStaticJobId( StandaloneApplicationClusterConfiguration clusterConfiguration, Configuration configuration) { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java index 9254e50c03fce..c2bd2734421af 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ApplicationConfiguration; -import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgramUtils; @@ -39,7 +39,6 @@ import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; import java.util.List; /** An {@link ApplicationClusterEntryPoint} for Kubernetes. */ @@ -88,7 +87,7 @@ public static void main(final String[] args) { } private static PackagedProgram getPackagedProgram(final Configuration configuration) - throws IOException, FlinkException { + throws FlinkException { final ApplicationConfiguration applicationConfiguration = ApplicationConfiguration.fromConfiguration(configuration); @@ -105,13 +104,9 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( final Configuration configuration, final String[] programArguments, @Nullable final String jobClassName) - throws IOException { + throws FlinkException { final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null); - final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = - ClassPathPackagedProgramRetriever.newBuilder(programArguments) - .setUserLibDirectory(userLibDir) - .setJobClassName(jobClassName); // No need to do pipelineJars validation if it is a PyFlink job. if (!(PackagedProgramUtils.isPython(jobClassName) @@ -119,8 +114,11 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(configuration); Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); - retrieverBuilder.setJarFile(pipelineJars.get(0)); + return DefaultPackagedProgramRetriever.create( + userLibDir, pipelineJars.get(0), jobClassName, programArguments); } - return retrieverBuilder.build(); + + return DefaultPackagedProgramRetriever.create( + userLibDir, jobClassName, programArguments); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java index 21ff7a2cbac5f..1d6c986d8d5de 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ApplicationConfiguration; -import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; import org.apache.flink.configuration.Configuration; @@ -112,7 +112,7 @@ public static void main(final String[] args) { } private static PackagedProgram getPackagedProgram(final Configuration configuration) - throws IOException, FlinkException { + throws FlinkException { final ApplicationConfiguration applicationConfiguration = ApplicationConfiguration.fromConfiguration(configuration); @@ -129,16 +129,12 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( final Configuration configuration, final String[] programArguments, @Nullable final String jobClassName) - throws IOException { + throws FlinkException { final File userLibDir = YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null); final File userApplicationJar = getUserApplicationJar(userLibDir, configuration); - final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = - ClassPathPackagedProgramRetriever.newBuilder(programArguments) - .setUserLibDirectory(userLibDir) - .setJarFile(userApplicationJar) - .setJobClassName(jobClassName); - return retrieverBuilder.build(); + return DefaultPackagedProgramRetriever.create( + userLibDir, userApplicationJar, jobClassName, programArguments); } private static File getUserApplicationJar(