Skip to content

Commit

Permalink
[FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop no…
Browse files Browse the repository at this point in the history
…t in classpath

This closes apache#12317.
  • Loading branch information
kl0u committed May 28, 2020
1 parent c22d01d commit c8ab94f
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,22 +176,25 @@ protected void runApplication(String[] args) throws Exception {
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);

final ProgramOptions programOptions = new ProgramOptions(commandLine);

if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = new ProgramOptions(commandLine);

final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);

programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, Collections.singletonList(uri.toString()));
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}

Expand All @@ -206,20 +209,23 @@ protected void run(String[] args) throws Exception {
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

final PackagedProgram program =
getPackagedProgram(programOptions);

final List<URL> jobJars = program.getJobJarAndDependencies();
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, jobJars);
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);

LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

Expand All @@ -242,16 +248,18 @@ private PackagedProgram getPackagedProgram(ProgramOptions programOptions) throws
}

private <T> Configuration getEffectiveConfiguration(
final CustomCommandLine activeCustomCommandLine,
final CommandLine commandLine,
final ProgramOptions programOptions,
final List<T> jobJars) throws FlinkException {

final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
checkNotNull(programOptions),
checkNotNull(jobJars));

final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
final Configuration executorConfig = checkNotNull(activeCustomCommandLine)
.applyCommandLineOptionsToConfiguration(commandLine);

final Configuration effectiveConfiguration = new Configuration(executorConfig);

executionParameters.applyToConfiguration(effectiveConfiguration);
Expand Down Expand Up @@ -292,8 +300,11 @@ protected void info(String[] args) throws Exception {

LOG.info("Creating program plan dump");

final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies());
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));

final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, program.getJobJarAndDependencies());

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
Expand Down Expand Up @@ -356,7 +367,7 @@ protected void list(String[] args) throws Exception {
showAll = listOptions.showAll();
}

final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);

runClusterAction(
activeCommandLine,
Expand Down Expand Up @@ -473,7 +484,7 @@ protected void stop(String[] args) throws Exception {

logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint.");

final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
runClusterAction(
activeCommandLine,
commandLine,
Expand Down Expand Up @@ -507,7 +518,7 @@ protected void cancel(String[] args) throws Exception {
return;
}

final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);

final String[] cleanedArgs = cancelOptions.getArgs();

Expand Down Expand Up @@ -597,7 +608,7 @@ protected void savepoint(String[] args) throws Exception {
return;
}

final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);

if (savepointOptions.isDispose()) {
runClusterAction(
Expand Down Expand Up @@ -1048,7 +1059,14 @@ public static List<CustomCommandLine> loadCustomCommandLines(Configuration confi
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(
loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}

// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
Expand All @@ -1067,15 +1085,15 @@ public static List<CustomCommandLine> loadCustomCommandLines(Configuration confi
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No command-line ran.");
throw new IllegalStateException("No valid command-line found.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final
throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
}

return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
if (compatibleFactories.isEmpty()) {
throw new IllegalStateException(
"No ClusterClientFactory found. If you were targeting a Yarn cluster, " +
"please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your " +
"classpath. For more information refer to the \"Deployment & Operations\" section of the official " +
"Apache Flink documentation.");
}

return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -85,13 +84,12 @@ public void testMoreThanOneCompatibleFactoriesException() {
fail();
}

@Test
@Test(expected = IllegalStateException.class)
public void testNoFactoriesFound() {
final Configuration config = new Configuration();
config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);

final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
assertNull(factory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ public PipelineExecutorFactory getExecutorFactory(final Configuration configurat
throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
}

return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
if (compatibleFactories.isEmpty()) {
throw new IllegalStateException("No ExecutorFactory found to execute the application.");
}

return compatibleFactories.get(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)

val customCLI = frontend.getActiveCustomCommandLine(commandLine)
val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)

val serviceLoader = new DefaultClusterClientServiceLoader
Expand Down Expand Up @@ -283,7 +283,7 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)

val customCLI = frontend.getActiveCustomCommandLine(commandLine)
val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);

(executorConfig, None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.cli;

import org.apache.flink.annotation.Internal;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

/**
* A stub Yarn Command Line to throw an exception with the correct
* message when the {@code HADOOP_CLASSPATH} is not set.
*/
@Internal
public class FallbackYarnSessionCli extends AbstractCustomCommandLine {

public static final String ID = "yarn-cluster";

private final Option applicationId;

public FallbackYarnSessionCli(Configuration configuration) {
super(configuration);
applicationId = new Option("yid", "yarnapplicationId", true, "Attach to running YARN session");
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
baseOptions.addOption(applicationId);
}

@Override
public boolean isActive(CommandLine commandLine) {
if (originalIsActive(commandLine)) {
throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
}
return false;
}

private boolean originalIsActive(CommandLine commandLine) {
final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
final boolean yarnJobManager = ID.equals(jobManagerOption);
final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
return hasYarnExecutor || yarnJobManager || hasYarnAppId;
}

@Override
public String getId() {
return ID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public enum YarnDeploymentTarget {
SESSION("yarn-session"),
APPLICATION("yarn-application");

public static final String ERROR_MESSAGE =
"No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable " +
"or have hadoop in your classpath. For more information refer to the \"Deployment & Operations\" " +
"section of the official Apache Flink documentation.";

private final String name;

YarnDeploymentTarget(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import javax.annotation.Nonnull;

Expand All @@ -44,6 +45,10 @@ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {

@Override
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
return new YarnJobClusterExecutor();
try {
return new YarnJobClusterExecutor();
} catch (NoClassDefFoundError e) {
throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import javax.annotation.Nonnull;

Expand All @@ -44,6 +45,10 @@ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {

@Override
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
return new YarnSessionClusterExecutor();
try {
return new YarnSessionClusterExecutor();
} catch (NoClassDefFoundError e) {
throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
}
}
}
Loading

0 comments on commit c8ab94f

Please sign in to comment.