Skip to content

Commit

Permalink
[FLINK-16661][cli] Fix log configuration forwarding for yarn applicat…
Browse files Browse the repository at this point in the history
…ion mode
  • Loading branch information
kl0u committed Apr 30, 2020
1 parent e2ccf81 commit d553c7f
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
Expand Down Expand Up @@ -125,8 +127,6 @@
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;

/**
* The descriptor with deployment information for deploying a Flink cluster on Yarn.
Expand Down Expand Up @@ -400,7 +400,6 @@ public ClusterClientProvider<ApplicationId> deployApplicationCluster(
checkNotNull(applicationConfiguration);

final YarnDeploymentTarget deploymentTarget = YarnDeploymentTarget.fromConfig(flinkConfiguration);

if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster." +
Expand All @@ -410,6 +409,9 @@ public ClusterClientProvider<ApplicationId> deployApplicationCluster(

applicationConfiguration.applyToConfiguration(flinkConfiguration);

final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

try {
return deployInternal(
clusterSpecification,
Expand Down Expand Up @@ -966,16 +968,11 @@ private ApplicationReport startAppMaster(
}
}

final boolean hasLogback = logConfigFilePath != null && logConfigFilePath.endsWith(CONFIG_FILE_LOGBACK_NAME);
final boolean hasLog4j = logConfigFilePath != null && logConfigFilePath.endsWith(CONFIG_FILE_LOG4J_NAME);

final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithFallbackForLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY);
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
processSpec);

Expand Down Expand Up @@ -1612,8 +1609,6 @@ void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {

ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
JobManagerProcessSpec processSpec) {
// ------------------ Prepare Application Master Container ------------------------------
Expand All @@ -1639,22 +1634,8 @@ ContainerLaunchContext setupApplicationMasterContainer(
startCommandValues.put("jvmmem", jvmHeapMem);

startCommandValues.put("jvmopts", javaOpts);
String logging = "";

if (hasLogback || hasLog4j) {
logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";

if (hasLogback) {
logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
}

if (hasLog4j) {
logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
logging += " -Dlog4j.configurationFile=file:" + CONFIG_FILE_LOG4J_NAME;
}
}
startCommandValues.put("logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));

startCommandValues.put("logging", logging);
startCommandValues.put("class", yarnClusterEntrypoint);
startCommandValues.put("redirects",
"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.yarn.cli;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
Expand All @@ -45,7 +44,7 @@
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;

Expand Down Expand Up @@ -77,7 +76,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -97,9 +95,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {

//------------------------------------ Constants -------------------------

public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";

private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;

/** The id for the CommandLine interface. */
Expand Down Expand Up @@ -438,38 +433,7 @@ private void applyDescriptorOptionToConfig(final CommandLine commandLine, final
configuration.setString(YarnConfigOptions.NODE_LABEL, nodeLabelValue);
}

setLogConfigFileInConfig(configuration, configurationDirectory);
}

@VisibleForTesting
public static Configuration setLogConfigFileInConfig(final Configuration configuration, final String configurationDirectory) {
if (configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
return configuration;
}

FlinkYarnSessionCli.discoverLogConfigFile(configurationDirectory).ifPresent(file ->
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
return configuration;
}

private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
Optional<File> logConfigFile = Optional.empty();

final File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
if (log4jFile.exists()) {
logConfigFile = Optional.of(log4jFile);
}

final File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
if (logbackFile.exists()) {
if (logConfigFile.isPresent()) {
LOG.warn("The configuration directory ('" + configurationDirectory + "') already contains a LOG4J config file." +
"If you want to use logback, then please delete or rename the log configuration file.");
} else {
logConfigFile = Optional.of(logbackFile);
}
}
return logConfigFile;
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
}

private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.configuration;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;

import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A class with utilities for setting the log config file.
*/
@Internal
public class YarnLogConfigUtil {

private static final Logger LOG = LoggerFactory.getLogger(YarnLogConfigUtil.class);

public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";

@VisibleForTesting
public static Configuration setLogConfigFileInConfig(
final Configuration configuration,
final String configurationDirectory) {

if (configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != null) {
return configuration;
}

discoverLogConfigFile(configurationDirectory).ifPresent(file ->
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
return configuration;
}

private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
Optional<File> logConfigFile = Optional.empty();

final File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
if (log4jFile.exists()) {
logConfigFile = Optional.of(log4jFile);
}

final File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
if (logbackFile.exists()) {
if (logConfigFile.isPresent()) {
LOG.warn("The configuration directory ('" + configurationDirectory + "') already contains a LOG4J config file." +
"If you want to use logback, then please delete or rename the log configuration file.");
} else {
logConfigFile = Optional.of(logbackFile);
}
}
return logConfigFile;
}

public static String getLoggingYarnCommand(final Configuration configuration) {
checkNotNull(configuration);

final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath == null) {
return "";
}

String logCommand = getLog4jCommand(logConfigFilePath);
if (logCommand.isEmpty()) {
logCommand = getLogBackCommand(logConfigFilePath);
}
return logCommand;
}

private static String getLogBackCommand(final String logConfigFilePath) {
final boolean hasLogback = logConfigFilePath.endsWith(CONFIG_FILE_LOGBACK_NAME);
if (!hasLogback) {
return "";
}

return new StringBuilder("-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"")
.append(" -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME)
.toString();
}

private static String getLog4jCommand(final String logConfigFilePath) {
final boolean hasLog4j = logConfigFilePath.endsWith(CONFIG_FILE_LOG4J_NAME);
if (!hasLog4j) {
return "";
}

return new StringBuilder("-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"")
.append(" -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME)
.append(" -Dlog4j.configurationFile=file:" + CONFIG_FILE_LOG4J_NAME)
.toString();
}
}
Loading

0 comments on commit d553c7f

Please sign in to comment.