Skip to content

Commit

Permalink
[runtime] Improve error messages and add tests for failed library dow…
Browse files Browse the repository at this point in the history
…nloads on TaskManagers
  • Loading branch information
StephanEwen committed Mar 30, 2015
1 parent 01adab5 commit 2667f0e
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public URL getURL(final BlobKey requiredBlob) throws IOException {
}
}
catch (IOException e) {
String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + '.';
String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
" and store it under " + localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
attempt++;
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
* caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
* programming pattern, so there exists at most on library manager at a time.
* <p>
* This class is thread-safe.
*/
public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {

Expand Down Expand Up @@ -236,20 +234,20 @@ private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException
URL url = blobService.getURL(key);

Integer references = blobKeyReferenceCounters.get(key);
int newReferences = references == null ? 1 : references.intValue() + 1;
int newReferences = references == null ? 1 : references + 1;
blobKeyReferenceCounters.put(key, newReferences);

return url;
}
catch (IOException e) {
throw new IOException("Cannot access jar file stored under " + key, e);
throw new IOException("Cannot get library with hash " + key, e);
}
}

private void unregisterReferenceToBlobKey(BlobKey key) {
Integer references = blobKeyReferenceCounters.get(key);
if (references != null) {
int newReferences = Math.max(references.intValue() - 1, 0);
int newReferences = Math.max(references - 1, 0);
blobKeyReferenceCounters.put(key, newReferences);
}
else {
Expand All @@ -261,7 +259,12 @@ private void unregisterReferenceToBlobKey(BlobKey key) {


// --------------------------------------------------------------------------------------------


/**
* An entry in the per-job library cache. Tracks which execution attempts
* still reference the libraries. Once none reference it any more, the
* libraries can be cleaned up.
*/
private static class LibraryCacheEntry {

private final ClassLoader classLoader;
Expand Down Expand Up @@ -289,7 +292,8 @@ public Set<BlobKey> getLibraries() {

public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
if (!libraries.containsAll(keys)) {
throw new IllegalStateException("The library registration references a different set of libraries than previous registrations for this job.");
throw new IllegalStateException(
"The library registration references a different set of libraries than previous registrations for this job.");
}

this.referenceHolders.add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.Test;

import static org.junit.Assert.*;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class BlobLibraryCacheManagerTest {

@Test
public void testLibraryCacheManagerCleanup(){
public void testLibraryCacheManagerCleanup() {
Configuration config = new Configuration();

config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1);
Expand Down Expand Up @@ -65,7 +70,7 @@ public void testLibraryCacheManagerCleanup(){

List<File> files = new ArrayList<File>();

for (BlobKey key: keys){
for (BlobKey key : keys) {
files.add(libraryCacheManager.getFile(key));
}

Expand All @@ -81,10 +86,10 @@ public void testLibraryCacheManagerCleanup(){
do {
Thread.sleep(500);
}
while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
System.currentTimeMillis() < deadline);
}

// this fails if we exited via a timeout
assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());

Expand All @@ -94,31 +99,127 @@ public void testLibraryCacheManagerCleanup(){
// the blob cache should no longer contain the files
try {
files.add(libraryCacheManager.getFile(key));
} catch (IOException ioe) {
}
catch (IOException ioe) {
caughtExceptions++;
}
}

assertEquals(2, caughtExceptions);

bc.close();
}
catch (Exception e){
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally{
if (server != null){
finally {
if (server != null) {
server.shutdown();
}

if (libraryCacheManager != null){
if (libraryCacheManager != null) {
try {
libraryCacheManager.shutdown();
} catch (IOException e) {
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}

@Test
public void testRegisterAndDownload() {
BlobServer server = null;
BlobCache cache = null;
File cacheDir = null;
try {
// create the blob transfer services
Configuration config = new Configuration();
server = new BlobServer(config);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
cache = new BlobCache(serverAddress, config);

// upload some meaningless data to the server
BlobClient uploader = new BlobClient(serverAddress);
BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
uploader.close();

BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);

assertEquals(0, libCache.getNumberOfCachedLibraries());

// first try to access a non-existing entry
try {
libCache.getClassLoader(new JobID());
fail("Should fail with an IllegalStateException");
}
catch (IllegalStateException e) {
// that#s what we want
}

// now register some BLOBs as libraries
{
JobID jid = new JobID();
ExecutionAttemptID executionId = new ExecutionAttemptID();
Collection<BlobKey> keys = Collections.singleton(dataKey1);

libCache.registerTask(jid, executionId, keys);
assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
assertEquals(1, libCache.getNumberOfCachedLibraries());
assertNotNull(libCache.getClassLoader(jid));

// un-register them again
libCache.unregisterTask(jid, executionId);
assertEquals(0, libCache.getNumberOfReferenceHolders(jid));

// library is still cached (but not associated with job any more)
assertEquals(1, libCache.getNumberOfCachedLibraries());

// should not be able to access the classloader any more
try {
libCache.getClassLoader(jid);
fail("Should fail with an IllegalStateException");
}
catch (IllegalStateException e) {
// that#s what we want
}
}

cacheDir = new File(cache.getStorageDir(), "cache");
assertTrue(cacheDir.exists());

// make sure no further blobs can be downloaded by removing the write
// permissions from the directory
assertTrue("Could not remove write permissions from cache directory", cacheDir.setWritable(false, false));

// since we cannot download this library any more, this call should fail
try {
libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2));
fail("This should fail with an IOException");
}
catch (IOException e) {
// splendid!
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
if (cacheDir != null) {
if (!cacheDir.setWritable(true, false)) {
System.err.println("Could not re-add write permissions to cache directory.");
}
}
if (cache != null) {
cache.shutdown();
}
if (server != null) {
server.shutdown();
}
}
}
}

0 comments on commit 2667f0e

Please sign in to comment.