Skip to content

Commit

Permalink
[FLINK-7643] [core] Misc. cleanups in FileSystem
Browse files Browse the repository at this point in the history
  - Simplify access to local file system
  - Use a fair lock for all FileSystem.get() operations
  - Robust falback to local fs for default scheme (avoids URI parsing error on Windows)
  - Deprecate 'getDefaultBlockSize()'
  - Deprecate create(...) with block sizes and replication factor, which is not applicable to many FS
  • Loading branch information
StephanEwen committed Oct 5, 2017
1 parent 84a07a3 commit 3b78684
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 60 deletions.
133 changes: 80 additions & 53 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.OperatingSystem;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -194,6 +193,8 @@ public enum WriteMode {
OVERWRITE
}

// ------------------------------------------------------------------------
// File System Implementation Classes
// ------------------------------------------------------------------------

private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
Expand All @@ -202,57 +203,40 @@ public enum WriteMode {

private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";

// ------------------------------------------------------------------------

/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);

// ------------------------------------------------------------------------

/** Object used to protect calls to specific methods.*/
private static final Object SYNCHRONIZATION_OBJECT = new Object();
private static final ReentrantLock LOCK = new ReentrantLock(true);

/**
* Data structure mapping file system keys (scheme + authority) to cached file system objects.
*/
private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
/** Cache for file systems, by scheme + authority. */
private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();

/**
* Data structure mapping file system schemes to the corresponding implementations
*/
private static final Map<String, String> FSDIRECTORY = new HashMap<String, String>();
/** Mapping of file system schemes to the corresponding implementations */
private static final Map<String, String> FSDIRECTORY = new HashMap<>();

/** The local file system. Needs to be lazily initialized to avoid that some JVMs deadlock
* on static subclass initialization. */
private static LocalFileSystem LOCAL_FS;

/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:https:///'} or
* {@code 'file:/'}. */
private static URI defaultScheme;

// ------------------------------------------------------------------------
// Initialization
// ------------------------------------------------------------------------

static {
FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
FSDIRECTORY.put("file", LocalFileSystem.class.getName());
}

/**
* Returns a reference to the {@link FileSystem} instance for accessing the
* local file system.
*
* @return a reference to the {@link FileSystem} instance for accessing the
* local file system.
*/
public static FileSystem getLocalFileSystem() {
// this should really never fail.
try {
URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:https:///");
return get(localUri);
}
catch (Exception e) {
throw new RuntimeException("Cannot create URI for local file system");
}
}

/**
* The default filesystem scheme to be used. This can be specified by the parameter
* <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
* set to <code>file:https:///</code> (see {@link ConfigConstants#FILESYSTEM_SCHEME}
* and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem.
*/
private static URI defaultScheme;

/**
* <p>
* Sets the default filesystem scheme based on the user-specified configuration parameter
Expand All @@ -269,26 +253,55 @@ public static FileSystem getLocalFileSystem() {
* @param config the configuration from where to fetch the parameter.
*/
public static void setDefaultScheme(Configuration config) throws IOException {
synchronized (SYNCHRONIZATION_OBJECT) {
LOCK.lock();
try {
if (defaultScheme == null) {
String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
try {
defaultScheme = new URI(stringifiedUri);
} catch (URISyntaxException e) {
throw new IOException("The URI used to set the default filesystem " +
"scheme ('" + stringifiedUri + "') is not valid.");
final String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, null);
if (stringifiedUri == null) {
defaultScheme = LocalFileSystem.getLocalFsURI();
}
else {
try {
defaultScheme = new URI(stringifiedUri);
} catch (URISyntaxException e) {
throw new IOException("The URI used to set the default filesystem " +
"scheme ('" + stringifiedUri + "') is not valid.");
}
}
}
}
finally {
LOCK.unlock();
}
}

// ------------------------------------------------------------------------
// Obtaining File System Instances
// ------------------------------------------------------------------------

/**
* Returns a reference to the {@link FileSystem} instance for accessing the local file system.
*
* @return a reference to the {@link FileSystem} instance for accessing the local file system.
*/
public static FileSystem getLocalFileSystem() {
LOCK.lock();
try {
if (LOCAL_FS == null) {
LOCAL_FS = new LocalFileSystem();
}
return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LOCAL_FS);
} finally {
LOCK.unlock();
}
}

@Internal
public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
FileSystem fs;
final URI asked = uri;

URI asked = uri;
synchronized (SYNCHRONIZATION_OBJECT) {
LOCK.lock();
try {

if (uri.getScheme() == null) {
try {
Expand Down Expand Up @@ -333,6 +346,7 @@ public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
}

// Try to create a new file system
final FileSystem fs;

if (!isFlinkSupportedScheme(uri.getScheme())) {
// no build in support for this file system. Falling back to Hadoop's FileSystem impl.
Expand Down Expand Up @@ -369,9 +383,12 @@ public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
// Add new file system object to cache
CACHE.put(key, fs);
}
}

return fs;
return fs;
}
finally {
LOCK.unlock();
}
}

/**
Expand Down Expand Up @@ -515,7 +532,10 @@ private static Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
* Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
*
* @return the number of bytes that large input files should be optimally be split into to minimize I/O time
*
* @deprecated This value is no longer used and is meaningless.
*/
@Deprecated
public long getDefaultBlockSize() {
return 32 * 1024 * 1024; // 32 MB;
}
Expand Down Expand Up @@ -598,8 +618,15 @@ public boolean exists(final Path f) throws IOException {
* Control the behavior of specific file systems via configurations instead.
*/
@Deprecated
public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize) throws IOException;
public FSDataOutputStream create(
Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws IOException {

return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
}

/**
* Opens an FSDataOutputStream at the indicated Path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,6 @@ public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
return new LocalDataOutputStream(file);
}

@Override
public FSDataOutputStream create(
Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
}


@Override
public boolean rename(final Path src, final Path dst) throws IOException {
final File srcFile = pathToFile(src);
Expand All @@ -289,4 +282,17 @@ public boolean rename(final Path src, final Path dst) throws IOException {
public boolean isDistributedFS() {
return false;
}

// ------------------------------------------------------------------------

/**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:https:///"} on other
* UNIX family platforms.
*
* @return The URI that represents the local file system.
*/
public static URI getLocalFsURI() {
return uri;
}
}

0 comments on commit 3b78684

Please sign in to comment.