Skip to content

Commit

Permalink
[FLINK-7643] [core] Rework FileSystem loading to use factories
Browse files Browse the repository at this point in the history
This makes sure that configurations are loaded once and file system instances are
properly reused by scheme and authority.

This also factors out a lot of the special treatment of Hadoop file systems and simply
makes the Hadoop File System factory the default fallback factory.
  • Loading branch information
StephanEwen committed Oct 6, 2017
1 parent a5ef09b commit 536675b
Show file tree
Hide file tree
Showing 26 changed files with 936 additions and 799 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public CliFrontend(String configDir) throws Exception {
this.config = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());

try {
FileSystem.setDefaultScheme(config);
FileSystem.initialize(config);
} catch (IOException e) {
throw new Exception("Error while setting the default " +
"filesystem scheme from configuration.", e);
Expand Down
3 changes: 3 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ under the License.
<excludes combine.children="append">
<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
<exclude>org.apache.flink.core.fs.FileSystem\$FSKey</exclude>
<exclude>org.apache.flink.core.fs.FileSystem#initialize(java.net.URI)</exclude>
<exclude>org.apache.flink.core.fs.FileSystem#isFlinkSupportedScheme(java.lang.String)</exclude>
<exclude>org.apache.flink.core.fs.FileSystem#setDefaultScheme(org.apache.flink.configuration.Configuration)</exclude>
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
<!-- Breaking changes between 1.1 and 1.2.
We ignore these changes because these are low-level, internal runtime configuration parameters -->
Expand Down
304 changes: 126 additions & 178 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.fs;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

import java.io.IOException;
import java.net.URI;

/**
* A factory to create file systems.
*
* <p>The factory is typically configured via {@link #configure(Configuration)} before
* creating file systems via {@link #create(URI)}.
*/
@PublicEvolving
public interface FileSystemFactory {

/**
* Applies the given configuration to this factory. All future file system
* instantiations via {@link #create(URI)} should take the configuration into
* account.
*
* @param config The configuration to apply.
*/
void configure(Configuration config);

/**
* Creates a new file system for the given file system URI.
* The URI describes the type of file system (via its scheme) and optionally the
* authority (for example the host) of the file system.
*
* @param fsUri The URI that describes the file system.
* @return A new instance of the specified file system.
*
* @throws IOException Thrown if the file system could not be instantiated.
*/
FileSystem create(URI fsUri) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public URI getUri() {
return unsafeFileSystem.getUri();
}

@Override
public void initialize(URI name) throws IOException {
unsafeFileSystem.initialize(name);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
return unsafeFileSystem.getFileStatus(f);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.fs;

import java.io.IOException;

/**
* An exception to indicate that a specific file system scheme is not supported.
*/
public class UnsupportedFileSystemSchemeException extends IOException {

private static final long serialVersionUID = 1L;

/**
* Creates a new exception with the given message.
*
* @param message The exception message
*/
public UnsupportedFileSystemSchemeException(String message) {
super(message);
}

/**
* Creates a new exception with the given message and cause.
*
* @param message The exception message
* @param cause The exception cause
*/
public UnsupportedFileSystemSchemeException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.fs.factories;

import org.apache.flink.core.fs.FileSystemFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A utility class to check and reflectively load the Hadoop file system factory.
*/
public class HadoopFileSystemFactoryLoader {

private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystemFactoryLoader.class);

private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";

private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration";

private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem";


/**
* Loads the FileSystemFactory for the Hadoop-backed file systems.
*/
public static FileSystemFactory loadFactory() {
final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader();

// first, see if the Flink runtime classes are available
final Class<? extends FileSystemFactory> factoryClass;
try {
factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class);
}
catch (ClassNotFoundException e) {
LOG.info("No Flink runtime dependency present - the extended set of supported File Systems " +
"via Hadoop is not available.");
return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
}
catch (Exception | LinkageError e) {
LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
}

// check (for eager and better exception messages) if the Hadoop classes are available here
try {
Class.forName(HADOOP_CONFIG_CLASS, false, cl);
Class.forName(HADOOP_FS_CLASS, false, cl);
}
catch (ClassNotFoundException e) {
LOG.info("Hadoop is not in the classpath/dependencies - the extended set of supported File Systems " +
"via Hadoop is not available.");
return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
}

// Create the factory.
try {
return factoryClass.newInstance();
}
catch (Exception | LinkageError e) {
LOG.warn("Flink's Hadoop file system factory could not be created", e);
return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;

package org.apache.flink.core.fs.factories;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.local.LocalFileSystem;

import java.net.URI;

/**
* A factory for the {@link LocalFileSystem}.
*/
@PublicEvolving
public interface HadoopFileSystemWrapper {
public class LocalFileSystemFactory implements FileSystemFactory {

@Override
public void configure(Configuration config) {
// the local file system takes no configuration, so nothing to do here
}

/**
* Test whether the HadoopWrapper can wrap the given file system scheme.
*
* @param scheme The scheme of the file system.
* @return The class implementing the file system.
*/
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme);
@Override
public FileSystem create(URI fsUri) {
return LocalFileSystem.getSharedInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.fs.factories;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;

/**
* A factory for the MapR file system.
*
* <p>This factory tries to reflectively instantiate the MapR file system. It can only be
* used when the MapR FS libraries are in the classpath.
*/
public class MapRFsFactory implements FileSystemFactory {

private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";

@Override
public void configure(Configuration config) {
// nothing to configure based on the configuration here
}

@Override
public FileSystem create(URI fsUri) throws IOException {
try {
Class<? extends FileSystem> fsClass = Class.forName(
MAPR_FILESYSTEM_CLASS, false, getClass().getClassLoader()).asSubclass(FileSystem.class);

Constructor<? extends FileSystem> constructor = fsClass.getConstructor(URI.class);

try {
return constructor.newInstance(fsUri);
}
catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
catch (ClassNotFoundException e) {
throw new IOException("Could not load MapR file system class '" + MAPR_FILESYSTEM_CLASS +
"\'. Please make sure the Flink runtime classes are part of the classpath or dependencies.", e);
}
catch (LinkageError e) {
throw new IOException("Some of the MapR FS or required Hadoop classes seem to be missing or incompatible. "
+ "Please check that a compatible version of the MapR Hadoop libraries is in the classpath.", e);
}
catch (IOException e) {
throw e;
}
catch (Throwable t) {
throw new IOException("Could not instantiate MapR file system class '" + MAPR_FILESYSTEM_CLASS + "'.", t);
}
}
}
Loading

0 comments on commit 536675b

Please sign in to comment.