Skip to content

Commit

Permalink
[FLINK-13938][yarn] Add yarn.provided.lib.dirs for shared libs across…
Browse files Browse the repository at this point in the history
… apps on YARN

Currently, for every job execution, the flink lib jars are uploaded to
hdfs and then register as Yarn local resources.

This PR introduces the yarn.provided.lib.dirs YARN option which allows
users to specify HDFS paths to dirs that contain files to be shared across
applications, e.g. the FLINK-DIST jar.

This makes job submission more efficient as it allows for  two optimizations:
* Use pre-uploaded flink binary to avoid uploading of flink system jars
* By default, the resource visibility for the shared libs is set to PUBLIC
  so that they will be downloaded only once and shared for all tasks running
  on the same node. This will make launching a container faster.

A command that leverages this feature looks like the following:

./bin/flink run -m yarn-cluster -d \
-yD yarn.provided.lib.dirs="hdfs:https://$namenode_address/flink-lib/flink-1.11-SNAPSHOT/lib;hdfs:https://$namenode_address/flink-lib/flink-1.11-SNAPSHOT/plugins" \
examples/streaming/WindowJoin.jar

This closes apache#12040.
  • Loading branch information
wangyang0918 authored and kl0u committed May 12, 2020
1 parent 486221d commit c59e894
Show file tree
Hide file tree
Showing 14 changed files with 631 additions and 130 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>String</td>
<td>When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).</td>
</tr>
<tr>
<td><h5>yarn.provided.lib.dirs</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs:https://$namenode_address/path/of/flink/lib</td>
</tr>
<tr>
<td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
<td style="word-wrap: break-word;">"krb5.keytab"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.util.TestLogger;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.junit.Assert;
Expand Down Expand Up @@ -92,7 +94,12 @@ public void testCreateTaskExecutorCredentials() throws Exception {
env.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, "");
env.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, "");
env.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
env.put(YarnConfigKeys.FLINK_JAR_PATH, root.toURI().toString());
env.put(YarnConfigKeys.FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
"flink.jar",
new Path(root.toURI()),
0,
System.currentTimeMillis(),
LocalResourceVisibility.APPLICATION).toString());
env = Collections.unmodifiableMap(env);

File credentialFile = temporaryFolder.newFile("container_tokens");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,28 @@
import org.apache.flink.yarn.testjob.YarnTestCacheJob;
import org.apache.flink.yarn.util.YarnTestUtils;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Test cases for the deployment of Yarn Flink clusters.
Expand All @@ -57,7 +65,7 @@ public class YARNITCase extends YarnTestBase {
private static final int sleepIntervalInMS = 100;

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
public final TemporaryFolder temporaryFolder = new TemporaryFolder(new File("/tmp"));

@BeforeClass
public static void setup() {
Expand Down Expand Up @@ -86,6 +94,18 @@ public void testPerJobModeWithDistributedCache() throws Exception {
YarnTestCacheJob.getDistributedCacheJobGraph(tmp.newFolder())));
}

@Test
public void testPerJobWithProvidedLibDirs() throws Exception {
final File tmpFlinkReleaseBinary = temporaryFolder.newFolder("flink-provided-lib");
FileUtils.copyDirectory(flinkLibFolder, tmpFlinkReleaseBinary);

final List<String> sharedLibDirs = Collections.singletonList(tmpFlinkReleaseBinary.getAbsoluteFile().toString());
final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, sharedLibDirs);

runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph()));
}

private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

Expand Down Expand Up @@ -115,12 +135,32 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

checkStagingDirectory(configuration, applicationId);

waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

private void checkStagingDirectory(Configuration flinkConfig, ApplicationId appId) throws IOException {
final List<String> providedLibDirs = flinkConfig.get(YarnConfigOptions.PROVIDED_LIB_DIRS);
final boolean isProvidedLibDirsConfigured = providedLibDirs != null && !providedLibDirs.isEmpty();

try (final FileSystem fs = FileSystem.get(YARN_CONFIGURATION)) {
final Path stagingDirectory = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString());
if (isProvidedLibDirsConfigured) {
assertFalse(
"The provided lib dirs is set, so the lib directory should not be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
} else {
assertTrue(
"The lib directory should be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
}
}
}

private JobGraph getTestingJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand Down
67 changes: 36 additions & 31 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

/**
* Utility class that provides helper methods to work with Apache Hadoop YARN.
Expand Down Expand Up @@ -125,25 +127,30 @@ public static void deleteApplicationFiles(final Map<String, String> env) {
static LocalResource registerLocalResource(
Path remoteRsrcPath,
long resourceSize,
long resourceModificationTime) {
long resourceModificationTime,
LocalResourceVisibility resourceVisibility) {
LocalResource localResource = Records.newRecord(LocalResource.class);
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
localResource.setSize(resourceSize);
localResource.setTimestamp(resourceModificationTime);
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setVisibility(resourceVisibility);
return localResource;
}

/**
* Creates a YARN resource for the remote object at the given location.
* @param fs remote filesystem
* @param remoteRsrcPath resource path to be registered
* @return YARN resource
*/
private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
LocalResource localResource = Records.newRecord(LocalResource.class);
FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
return localResource;
return registerLocalResource(
remoteRsrcPath,
jarStat.getLen(),
jarStat.getModificationTime(),
LocalResourceVisibility.APPLICATION);
}

public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Expand Down Expand Up @@ -319,8 +326,8 @@ static ContainerLaunchContext createTaskExecutorContext(

// get and validate all relevant variables

String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_DIST_JAR);
require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_DIST_JAR);

String appId = env.get(YarnConfigKeys.ENV_APP_ID);
require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
Expand Down Expand Up @@ -379,19 +386,14 @@ static ContainerLaunchContext createTaskExecutorContext(
hasKrb5 = true;
}

// register Flink Jar with remote HDFS
final String flinkJarPath;
final LocalResource flinkJar;
{
Path remoteJarPath = new Path(remoteFlinkJarPath);
FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
flinkJarPath = remoteJarPath.getName();
flinkJar = registerLocalResource(fs, remoteJarPath);
}

Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();

taskManagerLocalResources.put(flinkJarPath, flinkJar);
// register Flink Jar with remote HDFS
final YarnLocalResourceDescriptor flinkDistLocalResourceDesc =
YarnLocalResourceDescriptor.fromString(remoteFlinkJarPath);
taskManagerLocalResources.put(
flinkDistLocalResourceDesc.getResourceKey(),
flinkDistLocalResourceDesc.toLocalResource());

//To support Yarn Secure Integration Test Scenario
if (yarnConfResource != null) {
Expand All @@ -405,15 +407,8 @@ static ContainerLaunchContext createTaskExecutorContext(
}

// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
String[] keyAndPath = pathStr.split("=");
require(keyAndPath.length == 2, "Invalid entry in ship file list: %s", pathStr);
Path path = new Path(keyAndPath[1]);
LocalResource resource = registerLocalResource(path.getFileSystem(yarnConfig), path);
taskManagerLocalResources.put(keyAndPath[0], resource);
}
}
decodeYarnLocalResourceDescriptorListFromString(shipListString).forEach(
resourceDesc -> taskManagerLocalResources.put(resourceDesc.getResourceKey(), resourceDesc.toLocalResource()));

// now that all resources are prepared, we can create the launch context

Expand Down Expand Up @@ -497,6 +492,16 @@ static ContainerLaunchContext createTaskExecutorContext(
return ctx;
}

private static List<YarnLocalResourceDescriptor> decodeYarnLocalResourceDescriptorListFromString(String resources) throws Exception {
final List<YarnLocalResourceDescriptor> resourceDescriptors = new ArrayList<>();
for (String shipResourceDescStr : resources.split(LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR)) {
if (!shipResourceDescStr.isEmpty()) {
resourceDescriptors.add(YarnLocalResourceDescriptor.fromString(shipResourceDescStr));
}
}
return resourceDescriptors;
}

/**
* Validates a condition, throwing a RuntimeException if the condition is violated.
*
Expand Down
Loading

0 comments on commit c59e894

Please sign in to comment.