Skip to content

Commit

Permalink
[FLINK-17796] Respect user specified classpath for application mode
Browse files Browse the repository at this point in the history
This closes apache#12222.
  • Loading branch information
wangyang0918 authored and kl0u committed May 18, 2020
1 parent 2f13ba1 commit b591f90
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.flink.client.deployment.application;

import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
Expand All @@ -31,6 +35,12 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -69,4 +79,19 @@ protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
final ScheduledExecutor scheduledExecutor) {
return new MemoryArchivedExecutionGraphStore();
}

protected static void configureExecution(final Configuration configuration, final PackagedProgram program) throws MalformedURLException {
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, getClasspath(configuration, program), URL::toString);
}

private static List<URL> getClasspath(final Configuration configuration, final PackagedProgram program) throws MalformedURLException {
final List<URL> classpath = ConfigUtils.decodeListFromConfig(
configuration,
PipelineOptions.CLASSPATHS,
URL::new);
classpath.addAll(program.getClasspaths());
return Collections.unmodifiableList(classpath.stream().distinct().collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
Expand All @@ -44,7 +40,6 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;

import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory;

Expand Down Expand Up @@ -87,9 +82,12 @@ public static void main(String[] args) {
}

Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}

StandaloneApplicationClusterEntryPoint entrypoint = new StandaloneApplicationClusterEntryPoint(configuration, program);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -41,7 +37,6 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;

import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory;
Expand Down Expand Up @@ -74,9 +69,12 @@ public static void main(final String[] args) {
System.exit(1);
}

configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}

final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
new KubernetesApplicationClusterEntrypoint(configuration, program);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -44,7 +41,6 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -96,9 +92,12 @@ public static void main(final String[] args) {
System.exit(1);
}

configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}

YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =
new YarnApplicationClusterEntryPoint(configuration, program);
Expand Down

0 comments on commit b591f90

Please sign in to comment.