Skip to content

Commit

Permalink
[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs
Browse files Browse the repository at this point in the history
+ includes a new unit tests for recursive uploads to hfds:https:// targets
+ add a unit test for recursive file uploads to s3:https:// via s3a

[FLINK-4228][yarn/s3] turn the dependencies around

Instead of having flink-s3-fs-hadoop depend on flink-yarn_<scala_version>,
let flink-yarn depend on the s3 filesystem and implement the test there.
This is safer with regards to the scala-independent flink-s3-fs-hadoop project.

[FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations

This is how YARN would use it and what should really be tested.

[FLINK-4228][yarn] enable S3 tests for newer Hadoop versions

- requires the 'include_hadoop_aws' build profile (or property) to be set
- requires a newer aws-sdk version (than Hadoop pulls in) to work with our
  httpcomponents version
- we also add a check that at least one S3 implementation is tested to not
silently ignore all tests because of such a missing dependency

This closes apache#4939.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Nov 18, 2017
1 parent b00f1b3 commit cf8504d
Show file tree
Hide file tree
Showing 6 changed files with 698 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ matrix:
- jdk: "oraclejdk8"
env:
- TEST="misc"
- PROFILE="-Dhadoop.version=2.8.0"
- PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws"
- CACHE_NAME=JDK8_H280_M
- jdk: "openjdk8"
env:
Expand Down
63 changes: 63 additions & 0 deletions flink-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ under the License.
<name>flink-yarn</name>
<packaging>jar</packaging>

<properties>
<!-- for testing (will override Hadoop's default dependency on too low SDK versions that
do not work with our httpcomponents version) -->
<aws.sdk.version>1.11.171</aws.sdk.version>
</properties>

<dependencies>

<!-- core dependencies -->
Expand Down Expand Up @@ -153,6 +159,63 @@ under the License.
</plugins>
</build>
</profile>

<profile>
<!-- Hadoop >= 2.6 moved the S3 file systems from hadoop-common into hadoop-aws artifact
(see https://issues.apache.org/jira/browse/HADOOP-11074)
We can add the (test) dependency per default once 2.6 is the minimum required version.
-->
<id>include_hadoop_aws</id>
<activation>
<property>
<name>include_hadoop_aws</name>
</property>
</activation>
<dependencies>
<!-- for the S3 tests of YarnFileStageTestS3ITCase -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<!-- The aws-java-sdk-core requires jackson 2.6, but
hadoop pulls in 2.3 -->
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- override Hadoop's default dependency on too low SDK versions that do not work
with our httpcomponents version when initialising the s3a file system -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.sdk.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>

</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
Expand Down Expand Up @@ -624,6 +625,7 @@ public ApplicationReport startAppMaster(
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);
final Path homeDir = fs.getHomeDirectory();

// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
Expand Down Expand Up @@ -705,11 +707,25 @@ public ApplicationReport startAppMaster(
StringBuilder envShipFileList = new StringBuilder();

// upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
List<String> systemClassPaths = uploadAndRegisterFiles(
systemShipFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);

List<String> userClassPaths;
if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
Expand Down Expand Up @@ -739,32 +755,29 @@ public ApplicationReport startAppMaster(
}

// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
Path remotePathJar = Utils.setupLocalResource(
Path remotePathJar = setupSingleLocalResource(
"flink.jar",
fs,
appId.toString(),
appId,
flinkJarPath,
appMasterJar,
fs.getHomeDirectory());

localResources.put("flink.jar", appMasterJar);
localResources,
homeDir,
"");

// Upload the flink configuration
LocalResource flinkConf = Records.newRecord(LocalResource.class);

// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile);

Path remotePathConf = Utils.setupLocalResource(
Path remotePathConf = setupSingleLocalResource(
"flink-conf.yaml",
fs,
appId.toString(),
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
flinkConf,
fs.getHomeDirectory());

localResources.put("flink-conf.yaml", flinkConf);
localResources,
homeDir,
"");

paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
Expand All @@ -781,19 +794,24 @@ public ApplicationReport startAppMaster(
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
LocalResource jobgraph = Records.newRecord(LocalResource.class);
Path remoteJobGraph =
Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
localResources.put("job.graph", jobgraph);
paths.add(remoteJobGraph);

Path pathFromYarnURL = setupSingleLocalResource(
"job.graph",
fs,
appId,
new Path(fp.toURI()),
localResources,
homeDir,
"");
paths.add(pathFromYarnURL);
classPathBuilder.append("job.graph").append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}

Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');

FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
Expand All @@ -810,32 +828,44 @@ public ApplicationReport startAppMaster(
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
remoteKrb5Path = setupSingleLocalResource(
Utils.KRB5_FILE_NAME,
fs,
appId,
krb5ConfPath,
localResources,
homeDir,
"");

File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);

remoteYarnSiteXmlPath = setupSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
fs,
appId,
yarnSitePath,
localResources,
homeDir,
"");
hasKrb5 = true;
}
}

// setup security tokens
LocalResource keytabResource = null;
Path remotePathKeytab = null;
String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(keytab);
remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
remotePathKeytab = setupSingleLocalResource(
Utils.KEYTAB_FILE_NAME,
fs,
appId,
new Path(keytab),
localResources,
homeDir,
"");
}

final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
Expand Down Expand Up @@ -866,7 +896,7 @@ public ApplicationReport startAppMaster(
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
Expand All @@ -876,7 +906,7 @@ public ApplicationReport startAppMaster(
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());

if (keytabResource != null) {
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
Expand Down Expand Up @@ -981,25 +1011,54 @@ public ApplicationReport startAppMaster(
return report;
}

private static List<String> uploadAndRegisterFiles(
Collection<File> shipFiles,
/**
* Uploads and registers a single resource and adds it to <tt>localResources</tt>.
*
* @param key
* the key to add the resource under
* @param fs
* the remote file system to upload to
* @param appId
* application ID
* @param localSrcPath
* local path to the file
* @param localResources
* map of resources
*
* @return the remote path to the uploaded resource
*/
private static Path setupSingleLocalResource(
String key,
FileSystem fs,
String appId,
List<Path> remotePaths,
ApplicationId appId,
Path localSrcPath,
Map<String, LocalResource> localResources,
StringBuilder envShipFileList) throws IOException {
final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
for (File shipFile : shipFiles) {
LocalResource shipResources = Records.newRecord(LocalResource.class);
Path targetHomeDir,
String relativeTargetPath) throws IOException, URISyntaxException {

Path shipLocalPath = new Path("file:https://" + shipFile.getAbsolutePath());
Path remotePath =
Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory());
Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
fs,
appId.toString(),
localSrcPath,
targetHomeDir,
relativeTargetPath);

remotePaths.add(remotePath);
localResources.put(key, resource.f1);

localResources.put(shipFile.getName(), shipResources);
return resource.f0;
}

static List<String> uploadAndRegisterFiles(
Collection<File> shipFiles,
FileSystem fs,
Path targetHomeDir,
ApplicationId appId,
List<Path> remotePaths,
Map<String, LocalResource> localResources,
StringBuilder envShipFileList) throws IOException, URISyntaxException {

final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
for (File shipFile : shipFiles) {
if (shipFile.isDirectory()) {
// add directories to the classpath
java.nio.file.Path shipPath = shipFile.toPath();
Expand All @@ -1011,17 +1070,40 @@ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes at
throws IOException {
java.nio.file.Path relativePath = parentPath.relativize(file);

classPaths.add(relativePath.toString());

return FileVisitResult.CONTINUE;
String key = relativePath.toString();
try {
Path remotePath = setupSingleLocalResource(
key,
fs,
appId,
new Path(file.toUri()),
localResources,
targetHomeDir,
relativePath.getParent().toString());
remotePaths.add(remotePath);
envShipFileList.append(key).append("=").append(remotePath).append(",");

// add files to the classpath
classPaths.add(key);

return FileVisitResult.CONTINUE;
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
});
} else {
Path shipLocalPath = new Path("file:https://" + shipFile.getAbsolutePath());
String key = shipFile.getName();
Path remotePath = setupSingleLocalResource(
key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
remotePaths.add(remotePath);
envShipFileList.append(key).append("=").append(remotePath).append(",");

// add files to the classpath
classPaths.add(shipFile.getName());
classPaths.add(key);
}

envShipFileList.append(remotePath).append(",");
}
return classPaths;
}
Expand Down
Loading

0 comments on commit cf8504d

Please sign in to comment.