Skip to content

Commit

Permalink
[FLINK-9366] DistributedCache works with Distributed File System
Browse files Browse the repository at this point in the history
This closes apache#6107
  • Loading branch information
dawidwys committed Jun 15, 2018
1 parent fc3ee68 commit 167be0a
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.cache;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;

import java.io.File;
import java.io.Serializable;
Expand All @@ -29,17 +33,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;

/**
* DistributedCache provides static methods to write the registered cache files into job configuration or decode
* them from job configuration. It also provides user access to the file locally.
*/
@Public
public class DistributedCache {

/**
* Meta info about an entry in {@link DistributedCache}.
*/
public static class DistributedCacheEntry implements Serializable {

public String filePath;
Expand All @@ -48,9 +51,9 @@ public static class DistributedCacheEntry implements Serializable {

public byte[] blobKey;

public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped){
this.filePath=filePath;
this.isExecutable=isExecutable;
public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped) {
this.filePath = filePath;
this.isExecutable = isExecutable;
this.blobKey = blobKey;
this.isZipped = isZipped;
}
Expand Down Expand Up @@ -104,13 +107,15 @@ public File getFile(String name) {
// ------------------------------------------------------------------------

public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
int num = conf.getInteger(CACHE_FILE_NUM, 0) + 1;
conf.setInteger(CACHE_FILE_NUM, num);
conf.setString(CACHE_FILE_NAME + num, name);
conf.setString(CACHE_FILE_PATH + num, e.filePath);
conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
conf.setBoolean(CACHE_FILE_DIR + num, e.isZipped || new File(e.filePath).isDirectory());
conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey);
if (e.blobKey != null) {
conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey);
}
}

public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.hdfstests;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.NetUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via HDFS.
*/
public class DistributedCacheDfsTest {

private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
new org.apache.flink.configuration.Configuration(),
1,
1));

private static MiniDFSCluster hdfsCluster;
private static Configuration conf = new Configuration();

private static Path testFile;
private static Path testDir;

@BeforeClass
public static void setup() throws Exception {
File dataDir = TEMP_FOLDER.newFolder();

conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();

String hdfsURI = "hdfs:https://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";

FileSystem dfs = FileSystem.get(new URI(hdfsURI));
testFile = writeFile(dfs, dfs.getHomeDirectory(), "testFile");

testDir = new Path(dfs.getHomeDirectory(), "testDir");
dfs.mkdirs(testDir);
writeFile(dfs, testDir, "testFile1");
writeFile(dfs, testDir, "testFile2");
}

private static Path writeFile(FileSystem dfs, Path rootDir, String fileName) throws IOException {
Path file = new Path(rootDir, fileName);
try (
DataOutputStream outStream = new DataOutputStream(dfs.create(file,
FileSystem.WriteMode.OVERWRITE))) {
outStream.writeUTF(testFileContent);
}
return file;
}

@AfterClass
public static void teardown() {
hdfsCluster.shutdown();
}

@Test
public void testDistributeFileViaDFS() throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.registerCachedFile(testFile.toString(), "test_data", false);
env.registerCachedFile(testDir.toString(), "test_dir", false);

env.fromElements(1)
.map(new TestMapFunction())
.print();

env.execute("Distributed Cache Via Blob Test Program");
}

private static class TestMapFunction extends RichMapFunction<Integer, String> {

private static final long serialVersionUID = -3917258280687242969L;

@Override
public String map(Integer value) throws Exception {
final Path actualFile = new Path(getRuntimeContext().getDistributedCache().getFile("test_data").toURI());

Path path = new Path(actualFile.toUri());
assertFalse(path.getFileSystem().isDistributedFS());

DataInputStream in = new DataInputStream(actualFile.getFileSystem().open(actualFile));
String contents = in.readUTF();

assertEquals(testFileContent, contents);

final Path actualDir = new Path(getRuntimeContext().getDistributedCache().getFile("test_dir").toURI());
FileStatus fileStatus = actualDir.getFileSystem().getFileStatus(actualDir);
assertTrue(fileStatus.isDir());
FileStatus[] fileStatuses = actualDir.getFileSystem().listStatus(actualDir);
assertEquals(2, fileStatuses.length);

return contents;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ public JobExecutionResult execute() throws Exception {
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* may be local files (which will be distributed via BlobServer), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
*
* <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
Expand All @@ -862,7 +862,7 @@ public void registerCachedFile(String filePath, String name){
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* may be local files (which will be distributed via BlobServer), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
*
* <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobI
Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);

// kick off the copying
Callable<Path> cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
Callable<Path> cp;
if (entry.blobKey != null) {
cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
} else {
cp = new CopyFromDFSProcess(entry, target);
}
FutureTask<Path> copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);

Expand Down Expand Up @@ -302,6 +307,30 @@ public Path call() throws IOException {
}
}

/**
* Asynchronous file copy process.
*/
private static class CopyFromDFSProcess implements Callable<Path> {

private final Path filePath;
private final Path cachedPath;
private boolean executable;

public CopyFromDFSProcess(DistributedCacheEntry e, Path cachedPath) {
this.filePath = new Path(e.filePath);
this.executable = e.isExecutable;
this.cachedPath = cachedPath;
}

@Override
public Path call() throws IOException {
// let exceptions propagate. we can retrieve them later from
// the future and report them upon access to the result
FileUtils.copy(filePath, cachedPath, this.executable);
return cachedPath;
}
}

/**
* If no task is using this file after 5 seconds, clear it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -580,11 +581,54 @@ public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}

public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configuration clientConfig) throws IOException {
if (!userArtifacts.isEmpty()) {
try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
/**
* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them
* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor.
*
* @param blobServerAddress of the blob server to upload the files to
* @param blobClientConfig the blob client configuration
* @throws IOException Thrown, if the file upload to the Blob server failed.
*/
public void uploadUserArtifacts(
InetSocketAddress blobServerAddress,
Configuration blobClientConfig) throws IOException {

Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer = new HashSet<>();
Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS = new HashSet<>();

for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
Path filePath = new Path(userArtifact.getValue().filePath);

try {
if (filePath.getFileSystem().isDistributedFS()) {
distributeViaDFS.add(userArtifact);
} else {
uploadToBlobServer.add(userArtifact);
}

} catch (IOException ex) {
distributeViaDFS.add(userArtifact);
}
}

uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer);

for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) {
DistributedCache.writeFileInfoToConfig(
userArtifact.getKey(),
userArtifact.getValue(),
jobConfiguration
);
}
}

private void uploadViaBlob(
InetSocketAddress blobServerAddress,
Configuration clientConfig,
Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer) throws IOException {
if (!uploadToBlobServer.isEmpty()) {
try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : uploadToBlobServer) {
final PermanentBlobKey key = blobClient.uploadFile(jobID,
new Path(userArtifact.getValue().filePath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import static org.junit.Assert.*;

import java.net.InetSocketAddress;
import java.util.List;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.junit.Test;
Expand Down Expand Up @@ -258,6 +261,22 @@ public void testTopoSortCyclicGraphIntermediateCycle() {
fail(e.getMessage());
}
}

@Test
public void testConfiguringDistributedCache() throws Exception {
JobGraph testJob = new JobGraph("Test job");
testJob.addUserArtifact("dfsFile", new DistributedCache.DistributedCacheEntry("hdfs:https://tmp/file", false));

//it should never try to connect to that address
testJob.uploadUserArtifacts(new InetSocketAddress("localhost", 1111), new Configuration());

Configuration jobConfiguration = testJob.getJobConfiguration();
assertEquals(1, jobConfiguration.getInteger("DISTRIBUTED_CACHE_FILE_NUM", -1));
assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_DIR_1", true));
assertEquals("dfsFile", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_NAME_1", ""));
assertEquals("hdfs:https://tmp/file", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_PATH_1", ""));
assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_EXE_1", true));
}

private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
boolean seenFirst = false;
Expand Down
Loading

0 comments on commit 167be0a

Please sign in to comment.