From cf8504dba606ee758ac16867423e65dbf6afc23a Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 9 Nov 2016 15:04:50 -0500 Subject: [PATCH] [FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs + includes a new unit tests for recursive uploads to hfds:// targets + add a unit test for recursive file uploads to s3:// via s3a [FLINK-4228][yarn/s3] turn the dependencies around Instead of having flink-s3-fs-hadoop depend on flink-yarn_, let flink-yarn depend on the s3 filesystem and implement the test there. This is safer with regards to the scala-independent flink-s3-fs-hadoop project. [FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations This is how YARN would use it and what should really be tested. [FLINK-4228][yarn] enable S3 tests for newer Hadoop versions - requires the 'include_hadoop_aws' build profile (or property) to be set - requires a newer aws-sdk version (than Hadoop pulls in) to work with our httpcomponents version - we also add a check that at least one S3 implementation is tested to not silently ignore all tests because of such a missing dependency This closes #4939. --- .travis.yml | 2 +- flink-yarn/pom.xml | 63 +++++ .../yarn/AbstractYarnClusterDescriptor.java | 192 ++++++++++----- .../java/org/apache/flink/yarn/Utils.java | 86 ++++--- .../apache/flink/yarn/YarnFileStageTest.java | 218 +++++++++++++++++ .../flink/yarn/YarnFileStageTestS3ITCase.java | 220 ++++++++++++++++++ 6 files changed, 698 insertions(+), 83 deletions(-) create mode 100644 flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java create mode 100644 flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java diff --git a/.travis.yml b/.travis.yml index daf2186c09d27..5e2ef74aa8d28 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ matrix: - jdk: "oraclejdk8" env: - TEST="misc" - - PROFILE="-Dhadoop.version=2.8.0" + - PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws" - CACHE_NAME=JDK8_H280_M - jdk: "openjdk8" env: diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 5eafcc42421b0..6fc589edb4786 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -31,6 +31,12 @@ under the License. flink-yarn jar + + + 1.11.171 + + @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + + com.amazonaws + aws-java-sdk-s3 + ${aws.sdk.version} + test + + + com.amazonaws + aws-java-sdk-sts + ${aws.sdk.version} + test + + + + diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 8ecc371d798f4..5ac5c4ea4fde9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.ConfigConstants; @@ -624,6 +625,7 @@ public ApplicationReport startAppMaster( // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path final FileSystem fs = FileSystem.get(conf); + final Path homeDir = fs.getHomeDirectory(); // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && @@ -705,11 +707,25 @@ public ApplicationReport startAppMaster( StringBuilder envShipFileList = new StringBuilder(); // upload and register ship files - List systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList); + List systemClassPaths = uploadAndRegisterFiles( + systemShipFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); List userClassPaths; if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) { - userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList); + userClassPaths = uploadAndRegisterFiles( + userJarFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); } else { userClassPaths = Collections.emptyList(); } @@ -739,32 +755,29 @@ public ApplicationReport startAppMaster( } // Setup jar for ApplicationMaster - LocalResource appMasterJar = Records.newRecord(LocalResource.class); - Path remotePathJar = Utils.setupLocalResource( + Path remotePathJar = setupSingleLocalResource( + "flink.jar", fs, - appId.toString(), + appId, flinkJarPath, - appMasterJar, - fs.getHomeDirectory()); - - localResources.put("flink.jar", appMasterJar); + localResources, + homeDir, + ""); // Upload the flink configuration - LocalResource flinkConf = Records.newRecord(LocalResource.class); - // write out configuration file File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); tmpConfigurationFile.deleteOnExit(); BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile); - Path remotePathConf = Utils.setupLocalResource( + Path remotePathConf = setupSingleLocalResource( + "flink-conf.yaml", fs, - appId.toString(), + appId, new Path(tmpConfigurationFile.getAbsolutePath()), - flinkConf, - fs.getHomeDirectory()); - - localResources.put("flink-conf.yaml", flinkConf); + localResources, + homeDir, + ""); paths.add(remotePathJar); classPathBuilder.append("flink.jar").append(File.pathSeparator); @@ -781,11 +794,16 @@ public ApplicationReport startAppMaster( ObjectOutputStream obOutput = new ObjectOutputStream(output);){ obOutput.writeObject(jobGraph); } - LocalResource jobgraph = Records.newRecord(LocalResource.class); - Path remoteJobGraph = - Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory()); - localResources.put("job.graph", jobgraph); - paths.add(remoteJobGraph); + + Path pathFromYarnURL = setupSingleLocalResource( + "job.graph", + fs, + appId, + new Path(fp.toURI()), + localResources, + homeDir, + ""); + paths.add(pathFromYarnURL); classPathBuilder.append("job.graph").append(File.pathSeparator); } catch (Exception e) { LOG.warn("Add job graph to local resource fail"); @@ -793,7 +811,7 @@ public ApplicationReport startAppMaster( } } - Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/'); + Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/'); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); fs.setPermission(yarnFilesDir, permission); // set permission for path. @@ -810,32 +828,44 @@ public ApplicationReport startAppMaster( if (krb5Config != null && krb5Config.length() != 0) { File krb5 = new File(krb5Config); LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath()); - LocalResource krb5ConfResource = Records.newRecord(LocalResource.class); Path krb5ConfPath = new Path(krb5.getAbsolutePath()); - remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory()); - localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); + remoteKrb5Path = setupSingleLocalResource( + Utils.KRB5_FILE_NAME, + fs, + appId, + krb5ConfPath, + localResources, + homeDir, + ""); File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME); LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath()); - LocalResource yarnConfResource = Records.newRecord(LocalResource.class); Path yarnSitePath = new Path(f.getAbsolutePath()); - remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory()); - localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); - + remoteYarnSiteXmlPath = setupSingleLocalResource( + Utils.YARN_SITE_FILE_NAME, + fs, + appId, + yarnSitePath, + localResources, + homeDir, + ""); hasKrb5 = true; } } // setup security tokens - LocalResource keytabResource = null; Path remotePathKeytab = null; String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); if (keytab != null) { LOG.info("Adding keytab {} to the AM container local resource bucket", keytab); - keytabResource = Records.newRecord(LocalResource.class); - Path keytabPath = new Path(keytab); - remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory()); - localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); + remotePathKeytab = setupSingleLocalResource( + Utils.KEYTAB_FILE_NAME, + fs, + appId, + new Path(keytab), + localResources, + homeDir, + ""); } final ContainerLaunchContext amContainer = setupApplicationMasterContainer( @@ -866,7 +896,7 @@ public ApplicationReport startAppMaster( appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB())); appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString()); appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); + appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); @@ -876,7 +906,7 @@ public ApplicationReport startAppMaster( // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - if (keytabResource != null) { + if (remotePathKeytab != null) { appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString()); String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal); @@ -981,25 +1011,54 @@ public ApplicationReport startAppMaster( return report; } - private static List uploadAndRegisterFiles( - Collection shipFiles, + /** + * Uploads and registers a single resource and adds it to localResources. + * + * @param key + * the key to add the resource under + * @param fs + * the remote file system to upload to + * @param appId + * application ID + * @param localSrcPath + * local path to the file + * @param localResources + * map of resources + * + * @return the remote path to the uploaded resource + */ + private static Path setupSingleLocalResource( + String key, FileSystem fs, - String appId, - List remotePaths, + ApplicationId appId, + Path localSrcPath, Map localResources, - StringBuilder envShipFileList) throws IOException { - final List classPaths = new ArrayList<>(2 + shipFiles.size()); - for (File shipFile : shipFiles) { - LocalResource shipResources = Records.newRecord(LocalResource.class); + Path targetHomeDir, + String relativeTargetPath) throws IOException, URISyntaxException { - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - Path remotePath = - Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory()); + Tuple2 resource = Utils.setupLocalResource( + fs, + appId.toString(), + localSrcPath, + targetHomeDir, + relativeTargetPath); - remotePaths.add(remotePath); + localResources.put(key, resource.f1); - localResources.put(shipFile.getName(), shipResources); + return resource.f0; + } + + static List uploadAndRegisterFiles( + Collection shipFiles, + FileSystem fs, + Path targetHomeDir, + ApplicationId appId, + List remotePaths, + Map localResources, + StringBuilder envShipFileList) throws IOException, URISyntaxException { + final List classPaths = new ArrayList<>(2 + shipFiles.size()); + for (File shipFile : shipFiles) { if (shipFile.isDirectory()) { // add directories to the classpath java.nio.file.Path shipPath = shipFile.toPath(); @@ -1011,17 +1070,40 @@ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes at throws IOException { java.nio.file.Path relativePath = parentPath.relativize(file); - classPaths.add(relativePath.toString()); - - return FileVisitResult.CONTINUE; + String key = relativePath.toString(); + try { + Path remotePath = setupSingleLocalResource( + key, + fs, + appId, + new Path(file.toUri()), + localResources, + targetHomeDir, + relativePath.getParent().toString()); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + + // add files to the classpath + classPaths.add(key); + + return FileVisitResult.CONTINUE; + } catch (URISyntaxException e) { + throw new IOException(e); + } } }); } else { + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + String key = shipFile.getName(); + Path remotePath = setupSingleLocalResource( + key, fs, appId, shipLocalPath, localResources, targetHomeDir, ""); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + // add files to the classpath - classPaths.add(shipFile.getName()); + classPaths.add(key); } - envShipFileList.append(remotePath).append(","); } return classPaths; } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 32cbb64cb052d..652afec370e43 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -117,33 +118,60 @@ public static void setupYarnClassPath(Configuration conf, Map ap } /** + * Copy a local file to a remote file system. + * + * @param fs + * remote filesystem + * @param appId + * application ID + * @param localSrcPath + * path to the local file + * @param homedir + * remote home directory base (will be extended) + * @param relativeTargetPath + * relative target path of the file (will be prefixed be the full home directory we set up) + * * @return Path to remote file (usually hdfs) - * @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localSrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localSrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localSrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = + ".flink/" + + appId + + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath) + + "/" + localSrcPath.getName(); Path dst = new Path(homedir, suffix); - LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); - registerLocalResource(fs, dst, appMasterJar); - return dst; + LOG.info("Copying from " + localSrcPath + " to " + dst); + + fs.copyFromLocalFile(false, true, localSrcPath, dst); + + // now create the resource instance + LocalResource resource = registerLocalResource(fs, dst); + return Tuple2.of(dst, resource); } - public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException { + LocalResource localResource = Records.newRecord(LocalResource.class); FileStatus jarStat = fs.getFileStatus(remoteRsrcPath); localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri())); localResource.setSize(jarStat.getLen()); localResource.setTimestamp(jarStat.getModificationTime()); localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION); + return localResource; } public static void setTokensFor(ContainerLaunchContext amContainer, List paths, Configuration conf) throws IOException { @@ -340,10 +368,9 @@ static ContainerLaunchContext createTaskExecutorContext( LocalResource keytabResource = null; if (remoteKeytabPath != null) { log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); - keytabResource = Records.newRecord(LocalResource.class); Path keytabPath = new Path(remoteKeytabPath); FileSystem fs = keytabPath.getFileSystem(yarnConfig); - registerLocalResource(fs, keytabPath, keytabResource); + keytabResource = registerLocalResource(fs, keytabPath); } //To support Yarn Secure Integration Test Scenario @@ -352,30 +379,28 @@ static ContainerLaunchContext createTaskExecutorContext( boolean hasKrb5 = false; if (remoteYarnConfPath != null && remoteKrb5Path != null) { log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); - yarnConfResource = Records.newRecord(LocalResource.class); Path yarnConfPath = new Path(remoteYarnConfPath); FileSystem fs = yarnConfPath.getFileSystem(yarnConfig); - registerLocalResource(fs, yarnConfPath, yarnConfResource); + yarnConfResource = registerLocalResource(fs, yarnConfPath); log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); - krb5ConfResource = Records.newRecord(LocalResource.class); Path krb5ConfPath = new Path(remoteKrb5Path); fs = krb5ConfPath.getFileSystem(yarnConfig); - registerLocalResource(fs, krb5ConfPath, krb5ConfResource); + krb5ConfResource = registerLocalResource(fs, krb5ConfPath); hasKrb5 = true; } // register Flink Jar with remote HDFS - LocalResource flinkJar = Records.newRecord(LocalResource.class); + final LocalResource flinkJar; { Path remoteJarPath = new Path(remoteFlinkJarPath); FileSystem fs = remoteJarPath.getFileSystem(yarnConfig); - registerLocalResource(fs, remoteJarPath, flinkJar); + flinkJar = registerLocalResource(fs, remoteJarPath); } // register conf with local fs - LocalResource flinkConf = Records.newRecord(LocalResource.class); + final LocalResource flinkConf; { // write the TaskManager configuration to a local file final File taskManagerConfigFile = @@ -385,8 +410,13 @@ static ContainerLaunchContext createTaskExecutorContext( Path homeDirPath = new Path(clientHomeDir); FileSystem fs = homeDirPath.getFileSystem(yarnConfig); - setupLocalResource(fs, appId, - new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); + + flinkConf = setupLocalResource( + fs, + appId, + new Path(taskManagerConfigFile.toURI()), + homeDirPath, + "").f1; log.info("Prepared local resource for modified yaml: {}", flinkConf); } @@ -408,10 +438,11 @@ static ContainerLaunchContext createTaskExecutorContext( // prepare additional files to be shipped for (String pathStr : shipListString.split(",")) { if (!pathStr.isEmpty()) { - LocalResource resource = Records.newRecord(LocalResource.class); - Path path = new Path(pathStr); - registerLocalResource(path.getFileSystem(yarnConfig), path, resource); - taskManagerLocalResources.put(path.getName(), resource); + String[] keyAndPath = pathStr.split("="); + require(keyAndPath.length == 2, "Invalid entry in ship file list: %s", pathStr); + Path path = new Path(keyAndPath[1]); + LocalResource resource = registerLocalResource(path.getFileSystem(yarnConfig), path); + taskManagerLocalResources.put(keyAndPath[0], resource); } } @@ -488,4 +519,5 @@ static void require(boolean condition, String message, Object... values) { throw new RuntimeException(String.format(message, values)); } } + } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java new file mode 100644 index 0000000000000..4d3825311aef3 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Tests for verifying file staging during submission to YARN works. + */ +public class YarnFileStageTest extends TestLogger { + + @ClassRule + public static final TemporaryFolder CLASS_TEMP_DIR = new TemporaryFolder(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + + private static Path hdfsRootPath; + + private org.apache.hadoop.conf.Configuration hadoopConfig; + + // ------------------------------------------------------------------------ + // Test setup and shutdown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void createHDFS() throws Exception { + Assume.assumeTrue(!OperatingSystem.isWindows()); + + final File tempDir = CLASS_TEMP_DIR.newFolder(); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + hdfsRootPath = new Path(hdfsCluster.getURI()); + } + + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + hdfsCluster = null; + hdfsRootPath = null; + } + + @Before + public void initConfig() { + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); + } + + /** + * Verifies that nested directories are properly copied with a hdfs:// file + * system (from a file:///absolute/path source path). + */ + @Test + public void testCopyFromLocalRecursiveWithScheme() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, true); + } + + /** + * Verifies that nested directories are properly copied with a hdfs:// file + * system (from a /absolute/path source path). + */ + @Test + public void testCopyFromLocalRecursiveWithoutScheme() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, false); + } + + /** + * Verifies that nested directories are properly copied with the given filesystem and paths. + * + * @param targetFileSystem + * file system of the target path + * @param targetDir + * target path (URI like hdfs://...) + * @param tempFolder + * JUnit temporary folder rule to create the source directory with + * @param addSchemeToLocalPath + * whether add the file:// scheme to the local path to copy from + */ + public static void testCopyFromLocalRecursive( + FileSystem targetFileSystem, + Path targetDir, + TemporaryFolder tempFolder, + boolean addSchemeToLocalPath) throws Exception { + + // directory must not yet exist + assertFalse(targetFileSystem.exists(targetDir)); + + final File srcDir = tempFolder.newFolder(); + final Path srcPath; + if (addSchemeToLocalPath) { + srcPath = new Path("file://" + srcDir.getAbsolutePath()); + } else { + srcPath = new Path(srcDir.getAbsolutePath()); + } + + HashMap srcFiles = new HashMap<>(4); + + // create and fill source files + srcFiles.put("1", "Hello 1"); + srcFiles.put("2", "Hello 2"); + srcFiles.put("nested/3", "Hello nested/3"); + srcFiles.put("nested/4/5", "Hello nested/4/5"); + for (Map.Entry src : srcFiles.entrySet()) { + File file = new File(srcDir, src.getKey()); + //noinspection ResultOfMethodCallIgnored + file.getParentFile().mkdirs(); + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) { + out.writeUTF(src.getValue()); + } + } + + // copy the created directory recursively: + try { + List remotePaths = new ArrayList<>(); + HashMap localResources = new HashMap<>(); + AbstractYarnClusterDescriptor.uploadAndRegisterFiles( + Collections.singletonList(new File(srcPath.toUri().getPath())), + targetFileSystem, + targetDir, + ApplicationId.newInstance(0, 0), + remotePaths, + localResources, + new StringBuilder()); + assertEquals(srcFiles.size(), localResources.size()); + + Path workDir = ConverterUtils + .getPathFromYarnURL(localResources.get(srcPath.getName() + "/1").getResource()) + .getParent(); + + RemoteIterator targetFilesIterator = + targetFileSystem.listFiles(workDir, true); + HashMap targetFiles = + new HashMap<>(4); + + final int workDirPrefixLength = + workDir.toString().length() + 1; // one more for the concluding "/" + while (targetFilesIterator.hasNext()) { + LocatedFileStatus targetFile = targetFilesIterator.next(); + + try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) { + String absolutePathString = targetFile.getPath().toString(); + String relativePath = absolutePathString.substring(workDirPrefixLength); + targetFiles.put(relativePath, in.readUTF()); + + assertEquals("extraneous data in file " + relativePath, -1, in.read()); + } + } + + assertThat(targetFiles, equalTo(srcFiles)); + } finally { + // clean up + targetFileSystem.delete(targetDir, true); + } + } +} diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java new file mode 100644 index 0000000000000..74fb5963179d6 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNoException; + +/** + * Tests for verifying file staging during submission to YARN works with the S3A file system. + * + *

Note that the setup is similar to org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase. + */ +public class YarnFileStageTestS3ITCase extends TestLogger { + + private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET"); + + private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); + + private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); + private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Number of tests executed. + */ + private static int numRecursiveUploadTests = 0; + + /** + * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped. + */ + private static boolean skipTest = true; + + @BeforeClass + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist + Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); + Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); + Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + skipTest = false; + + setupCustomHadoopConfig(); + } + + @AfterClass + public static void resetFileSystemConfiguration() throws IOException { + FileSystem.initialize(new Configuration()); + } + + @AfterClass + public static void checkAtLeastOneTestRun() { + if (!skipTest) { + assertThat( + "No S3 filesystem upload test executed. Please activate the " + + "'include_hadoop_aws' build profile or set '-Dinclude_hadoop_aws' during build " + + "(Hadoop >= 2.6 moved S3 filesystems out of hadoop-common).", + numRecursiveUploadTests, greaterThan(0)); + } + } + + /** + * Create a Hadoop config file containing S3 access credentials. + * + *

Note that we cannot use them as part of the URL since this may fail if the credentials + * contain a "/" (see HADOOP-3733). + */ + private static void setupCustomHadoopConfig() throws IOException { + File hadoopConfig = TEMP_FOLDER.newFile(); + Map parameters = new HashMap<>(); + + // set all different S3 fs implementation variants' configuration keys + parameters.put("fs.s3a.access.key", ACCESS_KEY); + parameters.put("fs.s3a.secret.key", SECRET_KEY); + + parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY); + parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY); + + parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY); + parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY); + + try (PrintStream out = new PrintStream(new FileOutputStream(hadoopConfig))) { + out.println(""); + out.println(""); + out.println(""); + for (Map.Entry entry : parameters.entrySet()) { + out.println("\t"); + out.println("\t\t" + entry.getKey() + ""); + out.println("\t\t" + entry.getValue() + ""); + out.println("\t"); + } + out.println(""); + } + + final Configuration conf = new Configuration(); + conf.setString(ConfigConstants.HDFS_SITE_CONFIG, hadoopConfig.getAbsolutePath()); + + FileSystem.initialize(conf); + } + + /** + * Verifies that nested directories are properly copied with to the given S3 path (using the + * appropriate file system) during resource uploads for YARN. + * + * @param scheme + * file system scheme + * @param pathSuffix + * test path suffix which will be the test's target path + */ + private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception { + ++numRecursiveUploadTests; + + final Path basePath = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR); + final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem(); + + assumeFalse(fs.exists(basePath)); + + try { + final Path directory = new Path(basePath, pathSuffix); + + YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(), + new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true); + + // now directory must be gone + assertFalse(fs.exists(directory)); + } finally { + // clean up + fs.delete(basePath, true); + } + } + + /** + * Verifies that nested directories are properly copied with a s3a:// file + * systems during resource uploads for YARN. + */ + @Test + public void testRecursiveUploadForYarnS3() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3.S3FileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because S3FileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3", "testYarn-s3"); + } + + @Test + public void testRecursiveUploadForYarnS3n() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because NativeS3FileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3n", "testYarn-s3n"); + } + + @Test + public void testRecursiveUploadForYarnS3a() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because S3AFileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3a", "testYarn-s3a"); + } +}