diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 6f6e933265380..9552978c35464 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -796,9 +796,12 @@ PackagedProgram buildProgram( jarFile = getJarFile(jarFilePath); } - PackagedProgram program = entryPointClass == null ? - new PackagedProgram(jarFile, classpaths, programArgs) : - new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); + PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(jarFile) + .setUserClassPaths(classpaths) + .setEntryPointClassName(entryPointClass) + .setArguments(programArgs) + .build(); program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 2f765b14ed7ec..0a09367d3ae2d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -54,6 +54,10 @@ import java.util.jar.JarFile; import java.util.jar.Manifest; +import static org.apache.flink.client.program.PackagedProgramUtils.isPython; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * This class encapsulates represents a program, packaged in a jar file. It supplies * functionality to extract nested libraries, search for the program entry point, and extract @@ -92,88 +96,27 @@ public class PackagedProgram { */ private final boolean isPython; - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * argument. - * - * @param jarFile - * The jar file which contains the plan and a Manifest which defines - * the program-class - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException { - this(jarFile, Collections.emptyList(), null, args); - } - - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * argument. - * - * @param jarFile - * The jar file which contains the plan and a Manifest which defines - * the program-class - * @param classpaths - * Additional classpath URLs needed by the Program. - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, List classpaths, String... args) throws ProgramInvocationException { - this(jarFile, classpaths, null, args); - } - /** * Creates an instance that wraps the plan defined in the jar file using the given * arguments. For generating the plan the class defined in the className parameter * is used. * - * @param jarFile - * The jar file which contains the plan. - * @param entryPointClassName - * Name of the class which generates the plan. Overrides the class defined - * in the jar file manifest - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. + * @param jarFile The jar file which contains the plan. + * @param classpaths Additional classpath URLs needed by the Program. + * @param entryPointClassName Name of the class which generates the plan. Overrides the class defined + * in the jar file manifest + * @param args Optional. The arguments used to create the pact plan, depend on + * implementation of the pact plan. See getDescription(). + * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes + * may be a missing / wrong class or manifest files. */ - public PackagedProgram(File jarFile, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { - this(jarFile, Collections.emptyList(), entryPointClassName, args); - } + private PackagedProgram(@Nullable File jarFile, List classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { + checkNotNull(classpaths); + checkNotNull(args); + checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null."); - /** - * Creates an instance that wraps the plan defined in the jar file using the given - * arguments. For generating the plan the class defined in the className parameter - * is used. - * - * @param jarFile - * The jar file which contains the plan. - * @param classpaths - * Additional classpath URLs needed by the Program. - * @param entryPointClassName - * Name of the class which generates the plan. Overrides the class defined - * in the jar file manifest - * @param args - * Optional. The arguments used to create the pact plan, depend on - * implementation of the pact plan. See getDescription(). - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. - */ - public PackagedProgram(File jarFile, List classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { // Whether the job is a Python job. - isPython = entryPointClassName != null && (entryPointClassName.equals("org.apache.flink.client.python.PythonDriver") - || entryPointClassName.equals("org.apache.flink.client.python.PythonGatewayServer")); + isPython = isPython(entryPointClassName); URL jarFileUrl = null; if (jarFile != null) { @@ -184,12 +127,10 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } checkJarFile(jarFileUrl); - } else if (!isPython) { - throw new IllegalArgumentException("The jar file must not be null."); } this.jarFile = jarFileUrl; - this.args = args == null ? new String[0] : args; + this.args = args; // if no entryPointClassName name was given, we try and look one up through the manifest if (entryPointClassName == null) { @@ -209,23 +150,6 @@ public PackagedProgram(File jarFile, List classpaths, @Nullable String entr } } - public PackagedProgram(Class entryPointClass, String... args) throws ProgramInvocationException { - this.jarFile = null; - this.args = args == null ? new String[0] : args; - - this.extractedTempLibraries = Collections.emptyList(); - this.classpaths = Collections.emptyList(); - this.userCodeClassLoader = entryPointClass.getClassLoader(); - - // load the entry point class - this.mainClass = entryPointClass; - isPython = entryPointClass.getCanonicalName().equals("org.apache.flink.client.python.PythonDriver"); - - if (!hasMainMethod(mainClass)) { - throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); - } - } - public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings) { this.savepointSettings = savepointSettings; } @@ -247,9 +171,8 @@ public String getMainClassName() { * may contain a description of the plan itself and its arguments. * * @return The description of the PactProgram's input parameters. - * @throws ProgramInvocationException - * This invocation is thrown if the Program can't be properly loaded. Causes - * may be a missing / wrong class or manifest files. + * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes + * may be a missing / wrong class or manifest files. */ @Nullable public String getDescription() throws ProgramInvocationException { @@ -258,17 +181,16 @@ public String getDescription() throws ProgramInvocationException { ProgramDescription descr; try { descr = InstantiationUtil.instantiate( - this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class); + this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class); } catch (Throwable t) { return null; } try { return descr.getDescription(); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Error while getting the program description" + - (t.getMessage() == null ? "." : ": " + t.getMessage()), t); + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } else { @@ -280,7 +202,7 @@ public String getDescription() throws ProgramInvocationException { * This method assumes that the context environment is prepared, or the execution * will be a local execution by default. */ - public void invokeInteractiveModeForExecution() throws ProgramInvocationException{ + public void invokeInteractiveModeForExecution() throws ProgramInvocationException { callMainMethod(mainClass, args); } @@ -314,8 +236,7 @@ public List getAllLibraries() { for (File tmpLib : this.extractedTempLibraries) { try { libs.add(tmpLib.getAbsoluteFile().toURI().toURL()); - } - catch (MalformedURLException e) { + } catch (MalformedURLException e) { throw new RuntimeException("URL is invalid. This should not happen.", e); } } @@ -367,10 +288,9 @@ private static boolean hasMainMethod(Class entryClass) { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { return false; - } - catch (Throwable t) { + } catch (Throwable t) { throw new RuntimeException("Could not look up the main(String[]) method from the class " + - entryClass.getName() + ": " + t.getMessage(), t); + entryClass.getName() + ": " + t.getMessage(), t); } return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers()); @@ -386,10 +306,9 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method."); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " + - entryClass.getName() + ": " + t.getMessage(), t); + entryClass.getName() + ": " + t.getMessage(), t); } if (!Modifier.isStatic(mainMethod.getModifiers())) { @@ -401,14 +320,11 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr try { mainMethod.invoke(null, (Object) args); - } - catch (IllegalArgumentException e) { + } catch (IllegalArgumentException e) { throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e); - } - catch (IllegalAccessException e) { + } catch (IllegalAccessException e) { throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e); - } - catch (InvocationTargetException e) { + } catch (InvocationTargetException e) { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; @@ -419,8 +335,7 @@ private static void callMainMethod(Class entryClass, String[] args) throws Pr } else { throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod); } - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t); } } @@ -468,10 +383,9 @@ private static String getEntryPointClassNameFromJar(URL jarFile) throws ProgramI return className; } else { throw new ProgramInvocationException("Neither a '" + MANIFEST_ATTRIBUTE_MAIN_CLASS + "', nor a '" + - MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file."); + MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file."); } - } - finally { + } finally { try { jar.close(); } catch (Throwable t) { @@ -486,20 +400,16 @@ private static Class loadMainClass(String className, ClassLoader cl) throws P contextCl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); return Class.forName(className, false, cl); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' was not found in the jar file.", e); - } - catch (ExceptionInInitializerError e) { + } catch (ExceptionInInitializerError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' threw an error during initialization.", e); - } - catch (LinkageError e) { + } catch (LinkageError e) { throw new ProgramInvocationException("The program's entry point class '" + className + "' could not be loaded due to a linkage failure.", e); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("The program's entry point class '" + className + "' caused an exception during initialization: " + t.getMessage(), t); } finally { @@ -537,8 +447,7 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn if (containedJarFileEntries.isEmpty()) { return Collections.emptyList(); - } - else { + } else { // go over all contained jar files final List extractedTempLibraries = new ArrayList(containedJarFileEntries.size()); final byte[] buffer = new byte[4096]; @@ -557,11 +466,10 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn try { tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name); tempFile.deleteOnExit(); - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException( "An I/O error occurred while creating temporary file to extract nested library '" + - entry.getName() + "'.", e); + entry.getName() + "'.", e); } extractedTempLibraries.add(tempFile); @@ -578,12 +486,10 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn while ((numRead = in.read(buffer)) != -1) { out.write(buffer, 0, numRead); } - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException("An I/O error occurred while extracting nested library '" - + entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'."); - } - finally { + + entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'."); + } finally { if (out != null) { out.close(); } @@ -594,8 +500,7 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn } incomplete = false; - } - finally { + } finally { if (incomplete) { deleteExtractedLibraries(extractedTempLibraries); } @@ -603,15 +508,14 @@ public static List extractContainedLibraries(URL jarFile) throws ProgramIn return extractedTempLibraries; } - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Unknown I/O error while extracting contained jar files.", t); - } - finally { + } finally { if (jar != null) { try { jar.close(); - } catch (Throwable t) {} + } catch (Throwable t) { + } } } } @@ -625,13 +529,64 @@ public static void deleteExtractedLibraries(List tempLibraries) { private static void checkJarFile(URL jarfile) throws ProgramInvocationException { try { ClientUtils.checkJarFile(jarfile); - } - catch (IOException e) { + } catch (IOException e) { throw new ProgramInvocationException(e.getMessage(), e); - } - catch (Throwable t) { + } catch (Throwable t) { throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } + /** + * A Builder For {@link PackagedProgram}. + */ + public static class Builder { + + @Nullable + private File jarFile; + + @Nullable + private String entryPointClassName; + + private String[] args = new String[0]; + + private List userClassPaths = Collections.emptyList(); + + public Builder setJarFile(@Nullable File jarFile) { + this.jarFile = jarFile; + return this; + } + + public Builder setUserClassPaths(List userClassPaths) { + this.userClassPaths = userClassPaths; + return this; + } + + public Builder setEntryPointClassName(@Nullable String entryPointClassName) { + this.entryPointClassName = entryPointClassName; + return this; + } + + public Builder setArguments(String... args) { + this.args = args; + return this; + } + + public PackagedProgram build() throws ProgramInvocationException { + if (jarFile == null && entryPointClassName == null) { + throw new IllegalArgumentException("The jarFile and entryPointClassName can not be null at the same time."); + } + return new PackagedProgram( + jarFile, + userClassPaths, + entryPointClassName, + args); + } + + private Builder() { + } + } + + public static Builder newBuilder() { + return new Builder(); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index fa9f8b0486d15..2f2719318a772 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -33,6 +33,9 @@ */ public class PackagedProgramUtils { + private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"; + + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; /** * Creates a {@link JobGraph} with a specified {@link JobID} * from the given {@link PackagedProgram}. @@ -102,5 +105,10 @@ public static Pipeline getPipelineFromProgram(PackagedProgram prog, int parallel } } + public static Boolean isPython(String entryPointClassName) { + return (entryPointClassName != null) && + (entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME)); + } + private PackagedProgramUtils() {} } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index 2efb8ca9bc258..2b64c8238fb34 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -97,6 +97,7 @@ public void testFileNotJarFile() throws Exception { ProgramOptions programOptions = mock(ProgramOptions.class); ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class); when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath()); + when(programOptions.getProgramArgs()).thenReturn(new String[0]); try { frontend.buildProgram(programOptions, executionOptions); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 6c6ec1dfc85a8..78023d7cd2e08 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { public void testDetachedMode() throws Exception{ final ClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster()); try { - PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -109,7 +109,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestEager.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -119,7 +119,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetRuntime.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -129,7 +129,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -139,7 +139,7 @@ public void testDetachedMode() throws Exception{ } try { - PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class); + PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build(); ClientUtils.executeProgram(clusterClient, prg, 1, true); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { @@ -194,7 +194,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { @Test public void testGetExecutionPlan() throws ProgramInvocationException { - PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp"); + PackagedProgram prg = PackagedProgram.newBuilder() + .setEntryPointClassName(TestOptimizerPlan.class.getName()) + .setArguments("/dev/random", "/tmp") + .build(); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index 1b52f377c7497..6343a2f22277f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -49,7 +49,10 @@ public class ExecutionPlanCreationTest { @Test public void testGetExecutionPlan() { try { - PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp"); + PackagedProgram prg = PackagedProgram.newBuilder() + .setEntryPointClassName(TestOptimizerPlan.class.getName()) + .setArguments("/dev/random", "/tmp") + .build(); InetAddress mockAddress = InetAddress.getLocalHost(); InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java index 7a7cf64a1ac34..c30d3f518c8e8 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.client.cli.CliFrontendTestUtils; import org.apache.flink.configuration.ConfigConstants; import org.junit.Assert; @@ -28,10 +29,13 @@ import java.io.File; import java.io.FileOutputStream; import java.nio.file.Files; +import java.util.Collections; import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS; + /** * Tests for the {@link PackagedProgram}. */ @@ -57,8 +61,15 @@ public void testExtractContainedLibraries() throws Exception { Assert.assertArrayEquals(nestedJarContent, Files.readAllBytes(files.iterator().next().toPath())); } - private static final class NullOutputStream extends java.io.OutputStream { - @Override - public void write(int b) {} + @Test + public void testNotThrowExceptionWhenJarFileIsNull() throws Exception { + PackagedProgram.newBuilder() + .setUserClassPaths(Collections.singletonList(new File(CliFrontendTestUtils.getTestJarPath()).toURI().toURL())) + .setEntryPointClassName(TEST_JAR_MAIN_CLASS); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuilderThrowExceptionIfjarFileAndEntryPointClassNameAreBothNull() throws ProgramInvocationException { + PackagedProgram.newBuilder().build(); } } diff --git a/flink-container/docker/Dockerfile b/flink-container/docker/Dockerfile index a68835edff20b..a0d3d8fb1bce2 100644 --- a/flink-container/docker/Dockerfile +++ b/flink-container/docker/Dockerfile @@ -28,6 +28,7 @@ ENV FLINK_LIB_DIR $FLINK_HOME/lib ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins ENV FLINK_OPT_DIR $FLINK_HOME/opt ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts +ENV FLINK_USR_LIB_DIR $FLINK_HOME/usrlib ENV PATH $PATH:$FLINK_HOME/bin # flink-dist can point to a directory or a tarball on the local system @@ -51,7 +52,7 @@ ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/ RUN set -x && \ ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \ - for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \ + ln -s $FLINK_JOB_ARTIFACTS_DIR $FLINK_USR_LIB_DIR && \ if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python*.jar $FLINK_LIB_DIR; fi && \ if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \ addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ diff --git a/flink-container/docker/docker-entrypoint.sh b/flink-container/docker/docker-entrypoint.sh index 0bf7c04fa93c0..0c1df000acb9b 100755 --- a/flink-container/docker/docker-entrypoint.sh +++ b/flink-container/docker/docker-entrypoint.sh @@ -19,7 +19,7 @@ ################################################################################ ### If unspecified, the hostname of the container is taken as the JobManager address -FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"} +FLINK_HOME=${FLINK_HOME:-"/opt/flink"} JOB_CLUSTER="job-cluster" TASK_MANAGER="task-manager" diff --git a/flink-container/pom.xml b/flink-container/pom.xml index 78dfe3d909c58..e94579b39d641 100644 --- a/flink-container/pom.xml +++ b/flink-container/pom.xml @@ -95,6 +95,66 @@ under the License. + + create-test-dependency-user-jar + process-test-classes + + single + + + + + org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob + + + maven + false + + src/test/assembly/test-assembly-test-user-classloader-job-jar.xml + + + + + create-test-dependency-user-jar-depend + process-test-classes + + single + + + maven + false + + src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml + + + + + + + + maven-clean-plugin + 2.5 + + + remove-externaltestclasses + process-test-classes + + clean + + + true + + + ${project.build.testOutputDirectory} + + **/testjar/TestUser*.class + + + + + diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 57808e3328506..9ed109a49023d 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -26,9 +26,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.container.entrypoint.JarManifestParser.JarFileWithEntryClass; +import org.apache.flink.runtime.entrypoint.component.AbstractUserClassPathJobGraphRetriever; import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.slf4j.Logger; @@ -39,9 +42,14 @@ import java.io.File; import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.NoSuchElementException; import java.util.function.Supplier; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -50,7 +58,7 @@ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class * on the class path. */ -class ClassPathJobGraphRetriever implements JobGraphRetriever { +class ClassPathJobGraphRetriever extends AbstractUserClassPathJobGraphRetriever { private static final Logger LOG = LoggerFactory.getLogger(ClassPathJobGraphRetriever.class); @@ -69,26 +77,23 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever { @Nonnull private final Supplier> jarsOnClassPath; - ClassPathJobGraphRetriever( - @Nonnull JobID jobId, - @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments, - @Nullable String jobClassName) { - this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE); - } + @Nullable + private final File userLibDirectory; - @VisibleForTesting - ClassPathJobGraphRetriever( - @Nonnull JobID jobId, - @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments, - @Nullable String jobClassName, - @Nonnull Supplier> jarsOnClassPath) { + private ClassPathJobGraphRetriever( + @Nonnull JobID jobId, + @Nonnull SavepointRestoreSettings savepointRestoreSettings, + @Nonnull String[] programArguments, + @Nullable String jobClassName, + @Nonnull Supplier> jarsOnClassPath, + @Nullable File userLibDirectory) throws IOException { + super(userLibDirectory); + this.userLibDirectory = userLibDirectory; this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); this.jobClassName = jobClassName; - this.jarsOnClassPath = requireNonNull(jarsOnClassPath, "jarsOnClassPath"); + this.jarsOnClassPath = requireNonNull(jarsOnClassPath); } @Override @@ -112,15 +117,28 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept private PackagedProgram createPackagedProgram() throws FlinkException { final String entryClass = getJobClassNameOrScanClassPath(); try { - final Class mainClass = getClass().getClassLoader().loadClass(entryClass); - return new PackagedProgram(mainClass, programArguments); - } catch (ClassNotFoundException | ProgramInvocationException e) { + return PackagedProgram.newBuilder() + .setUserClassPaths(new ArrayList<>(getUserClassPaths())) + .setEntryPointClassName(entryClass) + .setArguments(programArguments) + .build(); + } catch (ProgramInvocationException e) { throw new FlinkException("Could not load the provided entrypoint class.", e); } } private String getJobClassNameOrScanClassPath() throws FlinkException { if (jobClassName != null) { + if (userLibDirectory != null) { + // check that we find the entrypoint class in the user lib directory. + if (!userClassPathContainsJobClass(jobClassName)) { + throw new FlinkException( + String.format( + "Could not find the provided job class (%s) in the user lib directory (%s).", + jobClassName, + userLibDirectory)); + } + } return jobClassName; } @@ -131,10 +149,47 @@ private String getJobClassNameOrScanClassPath() throws FlinkException { } } + private boolean userClassPathContainsJobClass(String jobClassName) { + for (URL userClassPath : getUserClassPaths()) { + try (final JarFile jarFile = new JarFile(userClassPath.getFile())) { + if (jarContainsJobClass(jobClassName, jarFile)) { + return true; + } + } catch (IOException e) { + ExceptionUtils.rethrow( + e, + String.format( + "Failed to open user class path %s. Make sure that all files on the user class path can be accessed.", + userClassPath)); + } + } + return false; + } + + private boolean jarContainsJobClass(String jobClassName, JarFile jarFile) { + return jarFile + .stream() + .map(JarEntry::getName) + .filter(fileName -> fileName.endsWith(FileUtils.CLASS_FILE_EXTENSION)) + .map(FileUtils::stripFileExtension) + .map(fileName -> fileName.replaceAll(Pattern.quote(File.separator), FileUtils.PACKAGE_SEPARATOR)) + .anyMatch(name -> name.equals(jobClassName)); + } + private String scanClassPathForJobJar() throws IOException { - LOG.info("Scanning class path for job JAR"); - JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jarsOnClassPath.get()); + final Iterable jars; + if (userLibDirectory == null) { + LOG.info("Scanning system class path for job JAR"); + jars = jarsOnClassPath.get(); + } else { + LOG.info("Scanning user class path for job JAR"); + jars = getUserClassPaths() + .stream() + .map(url -> new File(url.getFile())) + .collect(Collectors.toList()); + } + final JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jars); LOG.info("Using {} as job jar", jobJar); return jobJar.getEntryClass(); } @@ -164,4 +219,56 @@ private static boolean notNullAndNotEmpty(String string) { } } + static class Builder { + + private final JobID jobId; + + private final SavepointRestoreSettings savepointRestoreSettings; + + private final String[] programArguments; + + @Nullable + private String jobClassName; + + @Nullable + private File userLibDirectory; + + private Supplier> jarsOnClassPath = JarsOnClassPath.INSTANCE; + + private Builder(JobID jobId, SavepointRestoreSettings savepointRestoreSettings, String[] programArguments) { + this.jobId = requireNonNull(jobId); + this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings); + this.programArguments = requireNonNull(programArguments); + } + + Builder setJobClassName(@Nullable String jobClassName) { + this.jobClassName = jobClassName; + return this; + } + + Builder setUserLibDirectory(File userLibDirectory) { + this.userLibDirectory = userLibDirectory; + return this; + } + + Builder setJarsOnClassPath(Supplier> jarsOnClassPath) { + this.jarsOnClassPath = jarsOnClassPath; + return this; + } + + ClassPathJobGraphRetriever build() throws IOException { + return new ClassPathJobGraphRetriever( + jobId, + savepointRestoreSettings, + programArguments, + jobClassName, + jarsOnClassPath, + userLibDirectory); + } + } + + static Builder newBuilder(JobID jobId, SavepointRestoreSettings savepointRestoreSettings, String[] programArguments) { + return new Builder(jobId, savepointRestoreSettings, programArguments); + } + } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 8935f358d5237..b5be5c4151237 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -36,9 +36,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Optional; import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory; /** * {@link JobClusterEntrypoint} which is started with a job in a predefined @@ -74,10 +76,14 @@ private StandaloneJobClusterEntryPoint( } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException { + final ClassPathJobGraphRetriever.Builder classPathJobGraphRetrieverBuilder = ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, programArguments) + .setJobClassName(jobClassName); + tryFindUserLibDirectory().ifPresent(classPathJobGraphRetrieverBuilder::setUserLibDirectory); + return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( StandaloneResourceManagerFactory.INSTANCE, - new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); + classPathJobGraphRetrieverBuilder.build()); } public static void main(String[] args) { diff --git a/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml new file mode 100644 index 0000000000000..ac08f684ead37 --- /dev/null +++ b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-jar.xml @@ -0,0 +1,35 @@ + + + + test-user-classloader-job-jar + + jar + + false + + + ${project.build.testOutputDirectory} + / + + org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.class + + + + diff --git a/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml new file mode 100644 index 0000000000000..b1513219044a5 --- /dev/null +++ b/flink-container/src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml @@ -0,0 +1,35 @@ + + + + test-user-classloader-job-lib-jar + + jar + + false + + + ${project.build.testOutputDirectory} + / + + org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.class + + + + diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java index 433263dff4d2b..92228fd09a0ac 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java @@ -22,21 +22,34 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.JarsOnClassPath; +import org.apache.flink.container.entrypoint.testjar.TestJobInfo; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -44,6 +57,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Tests for the {@link ClassPathJobGraphRetriever}. @@ -53,20 +67,69 @@ public class ClassPathJobGraphRetrieverTest extends TestLogger { @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @ClassRule + public static final TemporaryFolder JOB_DIRS = new TemporaryFolder(); + private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; + /* + * The directory structure used to test + * + * userDirHasEntryClass/ + * |_jarWithEntryClass + * |_jarWithoutEntryClass + * |_textFile + * + * userDirHasNotEntryClass/ + * |_jarWithoutEntryClass + * |_textFile + */ + + private static final Collection expectedURLs = new ArrayList<>(); + + private static File userDirHasEntryClass; + + private static File userDirHasNotEntryClass; + + @BeforeClass + public static void init() throws IOException { + final String textFileName = "test.txt"; + final String userDirHasEntryClassName = "_test_user_dir_has_entry_class"; + final String userDirHasNotEntryClassName = "_test_user_dir_has_not_entry_class"; + + userDirHasEntryClass = JOB_DIRS.newFolder(userDirHasEntryClassName); + final Path userJarPath = userDirHasEntryClass.toPath().resolve(TestJobInfo.JOB_JAR_PATH.toFile().getName()); + final Path userLibJarPath = + userDirHasEntryClass.toPath().resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName()); + userDirHasNotEntryClass = JOB_DIRS.newFolder(userDirHasNotEntryClassName); + + //create files + Files.copy(TestJobInfo.JOB_JAR_PATH, userJarPath); + Files.copy(TestJobInfo.JOB_LIB_JAR_PATH, userLibJarPath); + Files.createFile(userDirHasEntryClass.toPath().resolve(textFileName)); + + Files.copy(TestJobInfo.JOB_LIB_JAR_PATH, userDirHasNotEntryClass.toPath().resolve(TestJobInfo.JOB_LIB_JAR_PATH.toFile().getName())); + Files.createFile(userDirHasNotEntryClass.toPath().resolve(textFileName)); + + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); + Arrays.asList(userJarPath, userLibJarPath) + .stream() + .map(path -> FileUtils.relativizePath(workingDirectory, path)) + .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) + .forEach(expectedURLs::add); + } + @Test - public void testJobGraphRetrieval() throws FlinkException { + public void testJobGraphRetrieval() throws FlinkException, IOException { final int parallelism = 42; final Configuration configuration = new Configuration(); configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); final JobID jobId = new JobID(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - jobId, - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - TestJob.class.getCanonicalName()); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(jobId, SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); @@ -76,15 +139,12 @@ public void testJobGraphRetrieval() throws FlinkException { } @Test - public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundException { + public void testJobGraphRetrievalFromJar() throws FlinkException, IOException { final File testJar = TestJob.getTestJobJar(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - new JobID(), - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - // No class name specified, but the test JAR "is" on the class path - null, - () -> Collections.singleton(testJar)); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); @@ -92,17 +152,16 @@ public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundEx } @Test - public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws FlinkException, FileNotFoundException { + public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws FlinkException, IOException { final File testJar = new File("non-existing"); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - new JobID(), - SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS, - // Both a class name is specified and a JAR "is" on the class path - // The class name should have precedence. - TestJob.class.getCanonicalName(), - () -> Collections.singleton(testJar)); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + // Both a class name is specified and a JAR "is" on the class path + // The class name should have precedence. + .setJobClassName(TestJob.class.getCanonicalName()) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); @@ -110,16 +169,15 @@ public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws } @Test - public void testSavepointRestoreSettings() throws FlinkException { + public void testSavepointRestoreSettings() throws FlinkException, IOException { final Configuration configuration = new Configuration(); final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true); final JobID jobId = new JobID(); - final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - jobId, - savepointRestoreSettings, - PROGRAM_ARGUMENTS, - TestJob.class.getCanonicalName()); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(jobId, savepointRestoreSettings, PROGRAM_ARGUMENTS) + .setJobClassName(TestJob.class.getCanonicalName()) + .build(); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); @@ -160,6 +218,68 @@ public void testJarFromClassPathSupplier() throws IOException { assertThat(jarFiles, contains(file1, file2)); } + @Test + public void testJobGraphRetrievalFailIfJobDirDoesNotHaveEntryClass() throws IOException { + final File testJar = TestJob.getTestJobJar(); + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(() -> Collections.singleton(testJar)) + .setUserLibDirectory(userDirHasNotEntryClass) + .build(); + try { + classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + Assert.fail("This case should throw exception !"); + } catch (FlinkException e) { + assertTrue(ExceptionUtils + .findThrowableWithMessage(e, "Failed to find job JAR on class path") + .isPresent()); + } + } + + @Test + public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir() throws IOException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJobInfo.JOB_CLASS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasNotEntryClass) + .build(); + try { + classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + Assert.fail("This case should throw class not found exception!!"); + } catch (FlinkException e) { + assertTrue(ExceptionUtils + .findThrowableWithMessage(e, "Could not find the provided job class") + .isPresent()); + } + + } + + @Test + public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() throws IOException, FlinkException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasEntryClass) + .build(); + final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + + assertThat(jobGraph.getClasspaths(), containsInAnyOrder(expectedURLs.toArray())); + } + + @Test + public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() throws IOException, FlinkException { + final ClassPathJobGraphRetriever classPathJobGraphRetriever = + ClassPathJobGraphRetriever.newBuilder(new JobID(), SavepointRestoreSettings.none(), PROGRAM_ARGUMENTS) + .setJobClassName(TestJobInfo.JOB_CLASS) + .setJarsOnClassPath(Collections::emptyList) + .setUserLibDirectory(userDirHasEntryClass) + .build(); + final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration()); + + assertThat(jobGraph.getClasspaths(), containsInAnyOrder(expectedURLs.toArray())); + } + private static String javaClassPath(String... entries) { String pathSeparator = System.getProperty(JarsOnClassPath.PATH_SEPARATOR); return String.join(pathSeparator, entries); @@ -175,5 +295,4 @@ private static Iterable setClassPathAndGetJarsOnClassPath(String classPath System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, originalClassPath); } } - } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java new file mode 100644 index 0000000000000..d682eccebc9a0 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestJobInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * The test job information. + */ +public class TestJobInfo { + + public static final String JOB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob"; + public static final String JOB_LIB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJobLib"; + public static final Path JOB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-jar.jar"); + public static final Path JOB_LIB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-lib-jar.jar"); +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java new file mode 100644 index 0000000000000..1f68db037fff2 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +/** + * This class can used to test situation that the jar is not in the system classpath. + */ +public class TestUserClassLoaderJob { + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource source = env.fromElements(new TestUserClassLoaderJobLib().getValue(), 1, 2, 3, 4); + final SingleOutputStreamOperator mapper = source.map(element -> 2 * element); + mapper.addSink(new DiscardingSink<>()); + + ParameterTool parameterTool = ParameterTool.fromArgs(args); + env.execute(TestUserClassLoaderJob.class.getCanonicalName() + "-" + parameterTool.getRequired("arg")); + } +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java new file mode 100644 index 0000000000000..82f5a29b33d87 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint.testjar; + +/** + * This class is depended by {@link TestUserClassLoaderJob}. + */ +class TestUserClassLoaderJobLib { + + int getValue() { + return 0; + } + + public static void main(String[] args) { + } +} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d84b5626000cf..b156ec844e551 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -2021,6 +2021,9 @@ public final class ConfigConstants { /** The environment variable name which contains the Flink installation root directory. */ public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; + /** The user lib directory name. */ + public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib"; + // ---------------------------- Encoding ------------------------------ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 08ea86281d2a7..37150edc5e643 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -84,6 +84,11 @@ public final class FileUtils { private static final String JAR_FILE_EXTENSION = "jar"; + public static final String CLASS_FILE_EXTENSION = "class"; + + public static final String PACKAGE_SEPARATOR = "."; + + // ------------------------------------------------------------------------ public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException { @@ -595,6 +600,16 @@ public static java.nio.file.Path getCurrentWorkingDirectory() { return Paths.get(System.getProperty("user.dir")); } + /** + * Checks whether the given file has a class extension. + * + * @param file to check + * @return true if the file has a class extension, otherwise false + */ + public static boolean isClassFile(java.nio.file.Path file) { + return CLASS_FILE_EXTENSION.equals(org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(file.toString())); + } + /** * Checks whether the given file has a jar extension. * @@ -605,6 +620,19 @@ public static boolean isJarFile(java.nio.file.Path file) { return JAR_FILE_EXTENSION.equals(org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(file.toString())); } + /** + * Remove the extension of the file name. + * @param fileName to strip + * @return the file name without extension + */ + public static String stripFileExtension(String fileName) { + final String extension = org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(fileName); + if (!extension.isEmpty()) { + return fileName.substring(0, fileName.lastIndexOf(extension) - 1); + } + return fileName; + } + /** * Converts the given {@link java.nio.file.Path} into a file {@link URL}. The resulting url is * relative iff the given path is relative. diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index ad34d71380333..56eb9b110c67d 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -84,7 +84,6 @@ public static void main(String[] args) throws Exception { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } - // execute program env.execute("Streaming WordCount"); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java index fa79a26e23fbb..281fb56ddc487 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java @@ -81,7 +81,10 @@ public void testExternalProgram() throws Exception { String testData = getClass().getResource(TEST_DATA_FILE).toString(); - PackagedProgram program = new PackagedProgram(new File(jarFile), new String[]{testData}); + PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(jarFile)) + .setArguments(new String[]{testData}) + .build(); program.invokeInteractiveModeForExecution(); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java index e1c7c44e362f8..70935bb199da2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java @@ -18,11 +18,9 @@ package org.apache.flink.runtime.webmonitor.handlers; -import java.io.File; - /** * Query parameter specifying the name of the entry point class. - * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...) + * @see org.apache.flink.client.program.PackagedProgram.Builder#setEntryPointClassName(String) */ public class EntryClassQueryParameter extends StringQueryParameter { public EntryClassQueryParameter() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index 359b65543cdea..dc3b0beb02e9a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -129,7 +129,7 @@ protected CompletableFuture handleRequest(@Nonnull HandlerRequest tryFindUserLibDirectory() { + final File flinkHomeDirectory = deriveFlinkHomeDirectoryFromLibDirectory(); + final File usrLibDirectory = new File(flinkHomeDirectory, ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR); + + if (!usrLibDirectory.isDirectory()) { + return Optional.empty(); + } + return Optional.of(usrLibDirectory); + } + + @Nullable + private static File deriveFlinkHomeDirectoryFromLibDirectory() { + final String libDirectory = System.getenv().get(ConfigConstants.ENV_FLINK_LIB_DIR); + + if (libDirectory == null) { + return null; + } else { + return new File(libDirectory).getParentFile(); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 2f755193844d1..23280f0d54a6c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -140,7 +140,9 @@ public void tearDown() { @Test public void testCustomSplitJobWithCustomClassLoaderJar() throws ProgramInvocationException { - PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram inputSplitTestProg = PackagedProgram.newBuilder() + .setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -153,7 +155,9 @@ public void testCustomSplitJobWithCustomClassLoaderJar() throws ProgramInvocatio @Test public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram streamingInputSplitTestProg = PackagedProgram.newBuilder() + .setJarFile(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -167,7 +171,9 @@ public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInv @Test public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException { URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL(); - PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); + PackagedProgram inputSplitTestProg2 = PackagedProgram.newBuilder() + .setJarFile(new File(INPUT_SPLITS_PROG_JAR_FILE)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -181,7 +187,7 @@ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, Pr @Test public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException { // regular streaming job - PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE)); + PackagedProgram streamingProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_PROG_JAR_FILE)).build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -196,7 +202,9 @@ public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInv public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException { // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543) // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. - PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); + PackagedProgram streamingCheckpointedProg = PackagedProgram.newBuilder() + .setJarFile(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -230,13 +238,13 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw @Test public void testKMeansJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram kMeansProg = new PackagedProgram( - new File(KMEANS_JAR_PATH), - new String[] { + PackagedProgram kMeansProg = PackagedProgram.newBuilder() + .setJarFile(new File(KMEANS_JAR_PATH)) + .setArguments(new String[] { KMeansData.DATAPOINTS, KMeansData.INITIAL_CENTERS, - "25" - }); + "25"}) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -249,7 +257,9 @@ public void testKMeansJobWithCustomClassLoader() throws ProgramInvocationExcepti @Test public void testUserCodeTypeJobWithCustomClassLoader() throws ProgramInvocationException { - PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH)); + PackagedProgram userCodeTypeProg = PackagedProgram.newBuilder() + .setJarFile(new File(USERCODETYPE_JAR_PATH)) + .build(); TestEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -265,12 +275,10 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx File checkpointDir = FOLDER.newFolder(); File outputDir = FOLDER.newFolder(); - final PackagedProgram program = new PackagedProgram( - new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH), - new String[] { - checkpointDir.toURI().toString(), - outputDir.toURI().toString() - }); + final PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)) + .setArguments(new String[] { checkpointDir.toURI().toString(), outputDir.toURI().toString()}) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), @@ -298,14 +306,14 @@ public void testDisposeSavepointWithCustomKvState() throws Exception { File checkpointDir = FOLDER.newFolder(); File outputDir = FOLDER.newFolder(); - final PackagedProgram program = new PackagedProgram( - new File(CUSTOM_KV_STATE_JAR_PATH), - new String[] { - String.valueOf(parallelism), - checkpointDir.toURI().toString(), - "5000", - outputDir.toURI().toString() - }); + final PackagedProgram program = PackagedProgram.newBuilder() + .setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH)) + .setArguments(new String[] { + String.valueOf(parallelism), + checkpointDir.toURI().toString(), + "5000", + outputDir.toURI().toString()}) + .build(); TestStreamEnvironment.setAsContext( miniClusterResource.getMiniCluster(), diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index dfab48ef3c96e..76370a22ba0b1 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -108,7 +108,7 @@ public void testFlinkContainerMemory() throws Exception { final File streamingWordCountFile = getTestJarPath("WindowJoin.jar"); - final PackagedProgram packagedProgram = new PackagedProgram(streamingWordCountFile); + final PackagedProgram packagedProgram = PackagedProgram.newBuilder().setJarFile(streamingWordCountFile).build(); final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 1); try {