Skip to content

Commit

Permalink
[FLINK-7768] [core] Load File Systems via Java Service abstraction
Browse files Browse the repository at this point in the history
This changes the discovery mechanism of file from static class name configurations
to a service mechanism (META-INF/services).

As part of that, it factors HDFS and MapR FS implementations into separate modules.

With this change, users can add new filesystem implementations and make them available
by simply adding them to the class path.
  • Loading branch information
StephanEwen committed Oct 6, 2017
1 parent bad3df5 commit 77e3701
Show file tree
Hide file tree
Showing 38 changed files with 1,516 additions and 689 deletions.
125 changes: 113 additions & 12 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.factories.HadoopFileSystemFactoryLoader;
import org.apache.flink.core.fs.factories.MapRFsFactory;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.factories.LocalFileSystemFactory;
import org.apache.flink.core.fs.local.LocalFileSystemFactory;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.File;
Expand All @@ -42,7 +44,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -198,6 +201,9 @@ public enum WriteMode {

// ------------------------------------------------------------------------

/** Logger for all FileSystem work */
private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);

/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
Expand All @@ -206,27 +212,23 @@ public enum WriteMode {
private static final ReentrantLock LOCK = new ReentrantLock(true);

/** Cache for file systems, by scheme + authority. */
private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();
private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();

/** Mapping of file system schemes to the corresponding implementation factories. */
private static final Map<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
private static final HashMap<String, FileSystemFactory> FS_FACTORIES = loadFileSystems();

/** The default factory that is used when no scheme matches. */
private static final FileSystemFactory FALLBACK_FACTORY = HadoopFileSystemFactoryLoader.loadFactory();
private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();

/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:https:///'} or {@code 'file:/'}. */
private static URI DEFAULT_SCHEME;


// ------------------------------------------------------------------------
// Initialization
// ------------------------------------------------------------------------

static {
FS_FACTORIES.put("file", new LocalFileSystemFactory());
FS_FACTORIES.put("maprfs", new MapRFsFactory());
}

/**
* Initializes the shared file system settings.
*
Expand Down Expand Up @@ -892,6 +894,105 @@ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean crea
}
}

// ------------------------------------------------------------------------

/**
* Loads the factories for the file systems directly supported by Flink.
* Aside from the {@link LocalFileSystem}, these file systems are loaded
* via Java's service framework.
*
* @return A map from the file system scheme to corresponding file system factory.
*/
private static HashMap<String, FileSystemFactory> loadFileSystems() {
final HashMap<String, FileSystemFactory> map = new HashMap<>();

// by default, we always have the the local file system factory
map.put("file", new LocalFileSystemFactory());

LOG.debug("Loading extension file systems via services");

try {
ServiceLoader<FileSystemFactory> serviceLoader = ServiceLoader.load(FileSystemFactory.class);
Iterator<FileSystemFactory> iter = serviceLoader.iterator();

// we explicitly use an iterator here (rather than for-each) because that way
// we can catch errors in individual service instantiations

//noinspection WhileLoopReplaceableByForEach
while (iter.hasNext()) {
try {
FileSystemFactory factory = iter.next();
String scheme = factory.getScheme();
map.put(scheme, factory);
LOG.debug("Added file system {}:{}", scheme, factory.getClass().getName());
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load a file systems via services", t);
}
}
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load additional file systems via services", t);
}

return map;
}

/**
* Utility loader for the Hadoop file system factory.
* We treat the Hadoop FS factory in a special way, because we use it as a catch
* all for file systems schemes not supported directly in Flink.
*
* <p>This method does a set of eager checks for availability of certain classes, to
* be able to give better error messages.
*/
private static FileSystemFactory loadHadoopFsFactory() {
final ClassLoader cl = FileSystem.class.getClassLoader();

// first, see if the Flink runtime classes are available
final Class<? extends FileSystemFactory> factoryClass;
try {
factoryClass = Class.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl).asSubclass(FileSystemFactory.class);
}
catch (ClassNotFoundException e) {
LOG.info("No Flink runtime dependency present. " +
"The extended set of supported File Systems via Hadoop is not available.");
return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
}
catch (Exception | LinkageError e) {
LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
}

// check (for eager and better exception messages) if the Hadoop classes are available here
try {
Class.forName("org.apache.hadoop.conf.Configuration", false, cl);
Class.forName("org.apache.hadoop.fs.FileSystem", false, cl);
}
catch (ClassNotFoundException e) {
LOG.info("Hadoop is not in the classpath/dependencies. " +
"The extended set of supported File Systems via Hadoop is not available.");
return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
}

// Create the factory.
try {
return factoryClass.newInstance();
}
catch (Exception | LinkageError e) {
LOG.warn("Flink's Hadoop file system factory could not be created", e);
return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
}
}

// ------------------------------------------------------------------------

/**
* An identifier of a file system, via its scheme and its authority.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
@PublicEvolving
public interface FileSystemFactory {

/**
* Gets the scheme of the file system created by this factory.
*/
String getScheme();

/**
* Applies the given configuration to this factory. All future file system
* instantiations via {@link #create(URI)} should take the configuration into
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.core.fs.factories;
package org.apache.flink.core.fs;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
Expand All @@ -32,7 +33,8 @@
/**
* A file system factory to throw an UnsupportedFileSystemSchemeException when called.
*/
public class UnsupportedSchemeFactory implements FileSystemFactory {
@Internal
class UnsupportedSchemeFactory implements FileSystemFactory {

private final String exceptionMessage;

Expand All @@ -48,6 +50,11 @@ public UnsupportedSchemeFactory(String exceptionMessage, @Nullable Throwable exc
this.exceptionCause = exceptionCause;
}

@Override
public String getScheme() {
return "n/a";
}

@Override
public void configure(Configuration config) {
// nothing to do here
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.core.fs.factories;
package org.apache.flink.core.fs.local;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
Expand All @@ -32,6 +32,11 @@
@PublicEvolving
public class LocalFileSystemFactory implements FileSystemFactory {

@Override
public String getScheme() {
return LocalFileSystem.getLocalFsURI().getScheme();
}

@Override
public void configure(Configuration config) {
// the local file system takes no configuration, so nothing to do here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ public class DelimitedInputFormatSamplingTest {

@BeforeClass
public static void initialize() {
try {
TestFileSystem.registerTestFileSysten();
} catch (Throwable t) {
Assert.fail("Could not setup the mock test filesystem.");
}

try {
// make sure we do 4 samples
CONFIG = TestConfigUtils.loadGlobalConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.flink.testutils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Map;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
Expand All @@ -32,8 +30,14 @@
import org.apache.flink.core.fs.local.LocalFileStatus;
import org.apache.flink.core.fs.local.LocalFileSystem;

/**
* A test file system. This also has a service entry in the test
* resources, to be loaded during tests.
*/
public class TestFileSystem extends LocalFileSystem {


public static final String SCHEME = "test";

private static int streamOpenCounter;

public static int getNumtimeStreamOpened() {
Expand Down Expand Up @@ -74,24 +78,17 @@ public FileStatus[] listStatus(Path f) throws IOException {

@Override
public URI getUri() {
return URI.create("test:https:///");
}

public static void registerTestFileSysten() throws Exception {
Class<FileSystem> fsClass = FileSystem.class;
Field dirField = fsClass.getDeclaredField("FS_FACTORIES");

dirField.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, FileSystemFactory> map = (Map<String, FileSystemFactory>) dirField.get(null);
dirField.setAccessible(false);

map.put("test", new TestFileSystemFactory());
return URI.create(SCHEME + ":https:///");
}

// ------------------------------------------------------------------------

private static final class TestFileSystemFactory implements FileSystemFactory {
public static final class TestFileSystemFactory implements FileSystemFactory {

@Override
public String getScheme() {
return SCHEME;
}

@Override
public void configure(Configuration config) {}
Expand Down
Loading

0 comments on commit 77e3701

Please sign in to comment.