Skip to content

Commit

Permalink
[FLINK-17463][tests] Avoid concurrent directory creation and deletion
Browse files Browse the repository at this point in the history
BlobCacheCleanupTest#testPermanentBlobCleanup() tests that job related
files are cleaned up by a background task when the job is released from
the PermanentBlobCache. The tests asserts that the uploaded blobs are
deleted from the filesystem. Because the scheduling of the background
task cannot be controlled from outside the cache, the test polls the
filesystem. More precisely, the test uses BlobUtils#getStorageLocation()
to build the path on the filesystem given a blobkey and tests the
existence of that path in regular intervals. As a side effect, however,
BlobUtils#getStorageLocation() also creates all necessary directories to
that path if they do not exist yet. This leads to a situation where
directories and concurrently deleted and created, which can cause
FileAlreadyExists exceptions. This commit fixes the issue.

Note that the above applies to all tests that invoke
BlobServerCleanupTest#checkFilesExist().

This closes apache#12376.
  • Loading branch information
GJL committed May 28, 2020
1 parent a835f31 commit c22d01d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public AbstractBlobCache(
this.serverAddress = serverAddress;
}

public File getStorageDir() {
return storageDir;
}

/**
* Returns local copy of the file for the BLOB with the given key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ public BlobServer(Configuration config, BlobStore blobStore) throws IOException
// Path Accessors
// --------------------------------------------------------------------------------------------

public File getStorageDir() {
return storageDir;
}

/**
* Returns a file handle to the file associated with the given blob key on the blob
* server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,22 @@ public static <T> int checkFilesExist(
int numFiles = 0;

for (BlobKey key : keys) {
final File blobFile;
final File storageDir;
if (blobService instanceof BlobServer) {
BlobServer server = (BlobServer) blobService;
blobFile = server.getStorageLocation(jobId, key);
storageDir = server.getStorageDir();
} else if (blobService instanceof PermanentBlobCache) {
PermanentBlobCache cache = (PermanentBlobCache) blobService;
blobFile = cache.getStorageLocation(jobId, key);
storageDir = cache.getStorageDir();
} else if (blobService instanceof TransientBlobCache) {
TransientBlobCache cache = (TransientBlobCache) blobService;
blobFile = cache.getStorageLocation(jobId, key);
storageDir = cache.getStorageDir();
} else {
throw new UnsupportedOperationException(
"unsupported BLOB service class: " + blobService.getClass().getCanonicalName());
}

final File blobFile = new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
if (blobFile.exists()) {
++numFiles;
} else if (doThrow) {
Expand Down

0 comments on commit c22d01d

Please sign in to comment.