Skip to content

Commit

Permalink
[FLINK-12042] Fix RocksDBStateBackend mistaken usage of the default f…
Browse files Browse the repository at this point in the history
…ilesystem

[FLINK-12042] Restrict only temporary snapshot directories to the local file system by changing the signature to accept files as parameter

This closes apache#8068.
  • Loading branch information
link3280 authored and tillrohrmann committed Apr 27, 2019
1 parent 14c9e9a commit fa3f761
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -154,11 +155,11 @@ public String toString() {
}

/**
* Creates a temporary snapshot directory for the given path. This will always return "null" as result of
* Creates a local temporary snapshot directory for the given path. This will always return "null" as result of
* {@link #completeSnapshotAndGetHandle()} and always attempt to delete the underlying directory in
* {@link #cleanup()}.
*/
public static SnapshotDirectory temporary(@Nonnull Path directory) throws IOException {
public static SnapshotDirectory temporary(@Nonnull File directory) throws IOException {
return new TemporarySnapshotDirectory(directory);
}

Expand All @@ -172,8 +173,8 @@ public static SnapshotDirectory permanent(@Nonnull Path directory) throws IOExce

private static class TemporarySnapshotDirectory extends SnapshotDirectory {

TemporarySnapshotDirectory(@Nonnull Path directory) throws IOException {
super(directory);
TemporarySnapshotDirectory(@Nonnull File directory) throws IOException {
super(new Path(directory.toURI()), FileSystem.getLocalFileSystem());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.state;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -79,13 +81,29 @@ public void exists() throws Exception {
File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));

Assert.assertFalse(folderA.isDirectory());
Path path = new Path(folderA.toURI());
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
Assert.assertFalse(snapshotDirectory.exists());
Path pathA = new Path(folderA.toURI());
SnapshotDirectory snapshotDirectoryA = SnapshotDirectory.permanent(pathA);
Assert.assertFalse(snapshotDirectoryA.exists());
Assert.assertTrue(folderA.mkdirs());
Assert.assertTrue(snapshotDirectory.exists());
Assert.assertTrue(snapshotDirectoryA.exists());
Assert.assertTrue(folderA.delete());
Assert.assertFalse(snapshotDirectory.exists());
Assert.assertFalse(snapshotDirectoryA.exists());

// ensure that snapshot directory will always use the local file system instead of the default file system
Configuration configuration = new Configuration();
configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "nonexistfs:https:///");
FileSystem.initialize(configuration);
File folderB = new File(folderRoot, String.valueOf(UUID.randomUUID()));
// only pass the path and leave the scheme missing
SnapshotDirectory snapshotDirectoryB = SnapshotDirectory.temporary(folderB);
Assert.assertTrue(snapshotDirectoryB.getFileSystem().equals(FileSystem.getLocalFileSystem()));
Assert.assertFalse(snapshotDirectoryB.exists());
Assert.assertTrue(folderB.mkdirs());
Assert.assertTrue(snapshotDirectoryB.exists());
Assert.assertTrue(folderB.delete());
Assert.assertFalse(snapshotDirectoryB.exists());
// restore the FileSystem configuration
FileSystem.initialize(new Configuration());
}

/**
Expand Down Expand Up @@ -194,8 +212,7 @@ public void temporary() throws Exception {
File folderRoot = temporaryFolder.getRoot();
File folder = new File(folderRoot, String.valueOf(UUID.randomUUID()));
Assert.assertTrue(folder.mkdirs());
Path folderPath = new Path(folder.toURI());
SnapshotDirectory tmpSnapshotDirectory = SnapshotDirectory.temporary(folderPath);
SnapshotDirectory tmpSnapshotDirectory = SnapshotDirectory.temporary(folder);
// temporary snapshot directories should not return a handle, because they will be deleted.
Assert.assertNull(tmpSnapshotDirectory.completeSnapshotAndGetHandle());
// check that the directory is deleted even after we called #completeSnapshotAndGetHandle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throw
}
} else {
// create a "temporary" snapshot directory because local recovery is inactive.
Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
return SnapshotDirectory.temporary(path);
File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
return SnapshotDirectory.temporary(snapshotDir);
}
}

Expand Down

0 comments on commit fa3f761

Please sign in to comment.