-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17796] Respect user specified classpath for application mode #12222
[FLINK-17796] Respect user specified classpath for application mode #12222
Conversation
442b1b9
to
16460c8
Compare
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit fd1a7f5 (Fri Oct 16 10:48:45 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
ConfigUtils.decodeListFromConfig( | ||
configuration, | ||
PipelineOptions.CLASSPATHS, | ||
url -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having this code in different places (see ExecutionConfigAccessor
), wouldn't it make sense to change the ConfigUtils.decodeListFromConfig()
to sth like:
public static <IN, OUT, E extends Throwable> List<OUT> decodeListFromConfig(
final ReadableConfig configuration,
final ConfigOption<List<IN>> key,
final FunctionWithException<IN, OUT, E> mapper) throws E {
checkNotNull(configuration);
checkNotNull(key);
checkNotNull(mapper);
final List<IN> encodedString = configuration.get(key);
if (encodedString == null) {
return Collections.emptyList();
}
final List<OUT> result = new ArrayList<>(encodedString.size());
for (IN input : encodedString) {
result.add(mapper.apply(input));
}
return result;
}
And have this method become simply:
protected static List<URL> getClasspath(final Configuration configuration, final PackagedProgram program) throws MalformedURLException {
final List<URL> classpath = ConfigUtils.decodeListFromConfig(
configuration,
PipelineOptions.CLASSPATHS,
URL::new);
classpath.addAll(program.getClasspaths());
return Collections.unmodifiableList(classpath.stream().distinct().collect(Collectors.toList()));
}
Of course this will require some more changes in the ExecutionConfigAccessor
and all the classes that are affected by the thrown exception, e.g. the executors who do not throw exception, they will now have to bubble up the exception.
a9efd8d
to
3edcf14
Compare
@kl0u I have addressed your comments. Please have another look. |
3edcf14
to
fd1a7f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good @wangyang0918 ! I will merge as soon as my Azure gives green.
What is the purpose of the change
Currently, when we deploy a Flink application cluster(e.g. Yarn, K8s),
C/-classpath
could not work properly. Since we override thePipelineOptions.CLASSPATHS
inYarnApplicationClusterEntrypoint
andKubernetesApplicationClusterEntrypoint
. Then client updated configuration will not take effect.Brief change log
getClasspath
to combine the user specified classpath and program classpathgetClasspath
to all deploymentVerifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation