Skip to content

Commit

Permalink
[FLINK-2954] Add config parameter for passing environment variables t…
Browse files Browse the repository at this point in the history
…o YARN

This closes apache#1409
  • Loading branch information
rmetzger committed Dec 5, 2015
1 parent 9849a57 commit b9639eb
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 30 deletions.
10 changes: 10 additions & 0 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,16 @@ so that the Flink client is able to pick those details up. This configuration pa
changing the default location of that file (for example for environments sharing a Flink
installation between users)

- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
will be passed as environment variables to the ApplicationMaster/JobManager process.
For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:

yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"


- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom
environment variables for the TaskManager processes.

## High Availability Mode

- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,20 @@ public final class ConfigConstants {
*/
public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";

/**
* Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager).
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
*/
public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";

/**
* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables.
*/
public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";


// ------------------------ Hadoop Configuration ------------------------

Expand Down Expand Up @@ -678,7 +692,7 @@ public final class ConfigConstants {
* The default path to the file containing the list of access privileged users and passwords.
*/
public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;

// ------------------------------ Akka Values ------------------------------

public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
Expand Down
6 changes: 0 additions & 6 deletions flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,6 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-language-binding-generic</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python</artifactId>
Expand Down
6 changes: 2 additions & 4 deletions flink-yarn-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ under the License.
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down Expand Up @@ -156,6 +156,7 @@ under the License.
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
<relocations>
<relocation>
Expand All @@ -174,9 +175,6 @@ under the License.
<shadedPattern>org.apache.flink.hadoop.shaded.org.jboss.netty</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class UtilsTest {
private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testHeapCutoff() {
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );

// test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));

conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
Expand Down Expand Up @@ -97,6 +98,28 @@ public void tooMuchCutoff() {
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

@Test
public void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");

Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);

Assert.assertEquals(1, res.size());
Map.Entry<String, String> entry = res.entrySet().iterator().next();
Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
Assert.assertEquals("/usr/lib/native", entry.getValue());
}

@Test
public void testGetEnvironmentVariablesErroneous() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.", "/usr/lib/native");

Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);

Assert.assertEquals(0, res.size());
}

//
// --------------- Tools to test if a certain string has been logged with Log4j. -------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private void testDetachedPerJobYarnClusterInternal(String job) {
}

// get temporary file for reading input data for wordcount example
File tmpInFile = null;
File tmpInFile;
try{
tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,11 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {

// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
// set user specified app master environment variables
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration));
// set classpath from YARN configuration
Utils.setupEnv(conf, appMasterEnv);
// set configuration values
// set Flink on YARN internal configuration values
appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
Expand Down
33 changes: 20 additions & 13 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.flink.yarn;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
Expand Down Expand Up @@ -98,7 +98,7 @@ public static Path setupLocalResource(Configuration conf, FileSystem fs, String

Path dst = new Path(homedir, suffix);

LOG.info("Copying from " + localRsrcPath + " to " + dst );
LOG.info("Copying from " + localRsrcPath + " to " + dst);
fs.copyFromLocalFile(localRsrcPath, dst);
registerLocalResource(fs, dst, appMasterJar);
return dst;
Expand Down Expand Up @@ -187,17 +187,6 @@ private static void obtainTokenForHBase(Credentials credentials, Configuration c
}
}

public static void logFilesInCurrentDirectory(final Logger logger) {
new File(".").list(new FilenameFilter() {

@Override
public boolean accept(File dir, String name) {
logger.info(dir.getAbsolutePath() + "/" + name);
return true;
}
});
}

/**
* Copied method from org.apache.hadoop.yarn.util.Apps
* It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
Expand All @@ -221,4 +210,22 @@ public static void addToEnvironment(Map<String, String> environment,
private Utils() {
throw new RuntimeException();
}

/**
* Method to extract environment variables from the flinkConfiguration based on the given prefix String.
*
* @param envPrefix Prefix for the environment variables key
* @param flinkConfiguration The Flink config to get the environment variable defintion from
*/
public static Map<String, String> getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
Map<String, String> result = new HashMap<>();
for(Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
if(entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
// remove prefix
String key = entry.getKey().substring(envPrefix.length());
result.put(key, entry.getValue());
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ abstract class ApplicationMasterBase {

// if a web monitor shall be started, set the port to random binding
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
}

val (actorSystem, jmActor, archiveActor, webMonitor) =
Expand Down Expand Up @@ -147,7 +147,7 @@ abstract class ApplicationMasterBase {
jobManagerPort, webServerPort, slots, taskManagerCount,
dynamicPropertiesEncodedString)

val hadoopConfig = new YarnConfiguration();
val hadoopConfig = new YarnConfiguration()

// send "start yarn session" message to YarnJobManager.
log.info("Starting YARN session on Job Manager.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,12 @@ class YarnJobManager(

ctx.setLocalResources(taskManagerLocalResources.asJava)

// Setup classpath for container ( = TaskManager )
// Setup classpath and environment variables for container ( = TaskManager )
val containerEnv = new java.util.HashMap[String, String]()
// user defined TaskManager environment variables
containerEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
flinkConfiguration))
// YARN classpath
Utils.setupEnv(yarnConf, containerEnv)
containerEnv.put(FlinkYarnClientBase.ENV_CLIENT_USERNAME, yarnClientUsername)
ctx.setEnvironment(containerEnv)
Expand Down

0 comments on commit b9639eb

Please sign in to comment.