From 8c52ea08b81ef0e3f829d77eca06aab5bca16137 Mon Sep 17 00:00:00 2001 From: Kostas Kloudas Date: Thu, 14 May 2020 10:26:42 +0200 Subject: [PATCH] [minor] Refactoring the ClassPathPackagedProgramRetriever creation --- ...tandaloneApplicationClusterEntryPoint.java | 4 +++- ...ubernetesApplicationClusterEntrypoint.java | 4 +++- .../YarnApplicationClusterEntryPoint.java | 20 ++++++++++++------- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java index b738b3c667967..08894505973a1 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java @@ -42,6 +42,7 @@ import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.net.URL; @@ -114,11 +115,12 @@ private static PackagedProgram getPackagedProgram( private static PackagedProgramRetriever getPackagedProgramRetriever( final String[] programArguments, @Nullable final String jobClassName) throws IOException { + final File userLibDir = tryFindUserLibDirectory().orElse(null); final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = ClassPathPackagedProgramRetriever .newBuilder(programArguments) + .setUserLibDirectory(userLibDir) .setJobClassName(jobClassName); - tryFindUserLibDirectory().ifPresent(retrieverBuilder::setUserLibDirectory); return retrieverBuilder.build(); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java index 2080b5426718f..de9023d33f882 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java @@ -104,12 +104,14 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(configuration); Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + final File userLibDir = tryFindUserLibDirectory().orElse(null); + final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = ClassPathPackagedProgramRetriever .newBuilder(programArguments) + .setUserLibDirectory(userLibDir) .setJarFile(pipelineJars.get(0)) .setJobClassName(jobClassName); - tryFindUserLibDirectory().ifPresent(retrieverBuilder::setUserLibDirectory); return retrieverBuilder.build(); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java index 5cb1e17d0dbd6..9f57b207cd602 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java @@ -123,17 +123,23 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( final String[] programArguments, @Nullable final String jobClassName) throws IOException { - final List pipelineJars = configuration.get(PipelineOptions.JARS).stream() - .map(uri -> new File(YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null), new Path(uri).getName())) - .collect(Collectors.toList()); - Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); - + final File userLibDir = YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null); + final File userApplicationJar = getUserApplicationJar(userLibDir, configuration); final ClassPathPackagedProgramRetriever.Builder retrieverBuilder = ClassPathPackagedProgramRetriever .newBuilder(programArguments) - .setJarFile(pipelineJars.get(0)) + .setUserLibDirectory(userLibDir) + .setJarFile(userApplicationJar) .setJobClassName(jobClassName); - YarnEntrypointUtils.getUsrLibDir(configuration).ifPresent(retrieverBuilder::setUserLibDirectory); return retrieverBuilder.build(); } + + private static File getUserApplicationJar(final File userLibDir, final Configuration configuration) { + final List pipelineJars = configuration.get(PipelineOptions.JARS).stream() + .map(uri -> new File(userLibDir, new Path(uri).getName())) + .collect(Collectors.toList()); + + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + return pipelineJars.get(0); + } }