From b591f906685ee1379d791d296bd0b305cc7e9bb7 Mon Sep 17 00:00:00 2001 From: wangyang0918 Date: Mon, 18 May 2020 18:29:48 +0800 Subject: [PATCH] [FLINK-17796] Respect user specified classpath for application mode This closes #12222. --- .../ApplicationClusterEntryPoint.java | 25 +++++++++++++++++++ ...tandaloneApplicationClusterEntryPoint.java | 14 +++++------ ...ubernetesApplicationClusterEntrypoint.java | 14 +++++------ .../YarnApplicationClusterEntryPoint.java | 13 +++++----- 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java index aac0704666791..9dbc8fa3a8cd8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java @@ -18,8 +18,12 @@ package org.apache.flink.client.deployment.application; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; @@ -31,6 +35,12 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -69,4 +79,19 @@ protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore( final ScheduledExecutor scheduledExecutor) { return new MemoryArchivedExecutionGraphStore(); } + + protected static void configureExecution(final Configuration configuration, final PackagedProgram program) throws MalformedURLException { + configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, getClasspath(configuration, program), URL::toString); + } + + private static List getClasspath(final Configuration configuration, final PackagedProgram program) throws MalformedURLException { + final List classpath = ConfigUtils.decodeListFromConfig( + configuration, + PipelineOptions.CLASSPATHS, + URL::new); + classpath.addAll(program.getClasspaths()); + return Collections.unmodifiableList(classpath.stream().distinct().collect(Collectors.toList())); + } } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java index 08894505973a1..41e874347e77d 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java @@ -23,13 +23,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; -import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; -import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; @@ -44,7 +40,6 @@ import java.io.File; import java.io.IOException; -import java.net.URL; import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory; @@ -87,9 +82,12 @@ public static void main(String[] args) { } Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration); - configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); + try { + configureExecution(configuration, program); + } catch (Exception e) { + LOG.error("Could not apply application configuration.", e); + System.exit(1); + } StandaloneApplicationClusterEntryPoint entrypoint = new StandaloneApplicationClusterEntryPoint(configuration, program); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java index de9023d33f882..1c4902ffb556a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java @@ -22,13 +22,9 @@ import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; -import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; -import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -41,7 +37,6 @@ import java.io.File; import java.io.IOException; -import java.net.URL; import java.util.List; import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory; @@ -74,9 +69,12 @@ public static void main(final String[] args) { System.exit(1); } - configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); + try { + configureExecution(configuration, program); + } catch (Exception e) { + LOG.error("Could not apply application configuration.", e); + System.exit(1); + } final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint = new KubernetesApplicationClusterEntrypoint(configuration, program); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java index 9f57b207cd602..c8b107cdf2e7a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java @@ -22,12 +22,9 @@ import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever; -import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; -import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -44,7 +41,6 @@ import java.io.File; import java.io.IOException; -import java.net.URL; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -96,9 +92,12 @@ public static void main(final String[] args) { System.exit(1); } - configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); + try { + configureExecution(configuration, program); + } catch (Exception e) { + LOG.error("Could not apply application configuration.", e); + System.exit(1); + } YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint = new YarnApplicationClusterEntryPoint(configuration, program);