Skip to content

Commit

Permalink
[FLINK-7767] [file system sinks] Avoid loading Hadoop conf dynamicall…
Browse files Browse the repository at this point in the history
…y at runtime
  • Loading branch information
StephanEwen committed Oct 6, 2017
1 parent 7843c2f commit bad3df5
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand All @@ -34,13 +33,17 @@
import org.apache.flink.util.Preconditions;

import org.apache.commons.lang3.time.StopWatch;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -279,6 +282,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
/**
* User-defined FileSystem parameters.
*/
@Nullable
private Configuration fsConfig;

/**
Expand Down Expand Up @@ -382,19 +386,10 @@ public void open(Configuration parameters) throws Exception {
* @throws IOException
*/
private void initFileSystem() throws IOException {
if (fs != null) {
return;
if (fs == null) {
Path path = new Path(basePath);
fs = BucketingSink.createHadoopFileSystem(path, fsConfig);
}
org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
if (fsConfig != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
hadoopConf.setBoolean(disableCacheName, true);
for (String key : fsConfig.keySet()) {
hadoopConf.set(key, fsConfig.getString(key, null));
}
}

fs = new Path(basePath).getFileSystem(hadoopConf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void open(FileSystem fs, Path path) throws IOException {

CompressionCodec codec = null;

Configuration conf = HadoopFileSystem.getHadoopConfiguration();
Configuration conf = fs.getConf();

if (!compressionCodecName.equals("None")) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,22 @@
import org.apache.flink.util.Preconditions;

import org.apache.commons.lang3.time.StopWatch;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -287,6 +292,7 @@ public class BucketingSink<T>
/**
* User-defined FileSystem parameters.
*/
@Nullable
private Configuration fsConfig;

/**
Expand Down Expand Up @@ -402,19 +408,10 @@ public long currentTimeMillis() {
* @throws IOException
*/
private void initFileSystem() throws IOException {
if (fs != null) {
return;
if (fs == null) {
Path path = new Path(basePath);
fs = createHadoopFileSystem(path, fsConfig);
}
org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
if (fsConfig != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
hadoopConf.setBoolean(disableCacheName, true);
for (String key : fsConfig.keySet()) {
hadoopConf.set(key, fsConfig.getString(key, null));
}
}

fs = new Path(basePath).getFileSystem(hadoopConf);
}

@Override
Expand Down Expand Up @@ -1113,4 +1110,100 @@ public String toString() {
this.lastWrittenToTime = lastWrittenToTime;
}
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

public static FileSystem createHadoopFileSystem(
Path path,
@Nullable Configuration extraUserConf) throws IOException {

// try to get the Hadoop File System via the Flink File Systems
// that way we get the proper configuration

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

// fast path: if the Flink file system wraps Hadoop anyways and we need no extra config,
// then we use it directly
if (extraUserConf == null && hadoopFs != null) {
return hadoopFs;
}
else {
// we need to re-instantiate the Hadoop file system, because we either have
// a special config, or the Path gave us a Flink FS that is not backed by
// Hadoop (like file:https://)

final org.apache.hadoop.conf.Configuration hadoopConf;
if (hadoopFs != null) {
// have a Hadoop FS but need to apply extra config
hadoopConf = hadoopFs.getConf();
}
else {
// the Path gave us a Flink FS that is not backed by Hadoop (like file:https://)
// we need to get access to the Hadoop file system first

// we access the Hadoop FS in Flink, which carries the proper
// Hadoop configuration. we should get rid of this once the bucketing sink is
// properly implemented against Flink's FS abstraction

URI genericHdfsUri = URI.create("hdfs:https://localhost:12345/");
org.apache.flink.core.fs.FileSystem accessor =
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(genericHdfsUri);

if (!(accessor instanceof HadoopFileSystem)) {
throw new IOException(
"Cannot instantiate a Hadoop file system to access the Hadoop configuration. " +
"FS for hdfs:https:// is " + accessor.getClass().getName());
}

hadoopConf = ((HadoopFileSystem) accessor).getHadoopFileSystem().getConf();
}

// finalize the configuration

final org.apache.hadoop.conf.Configuration finalConf;
if (extraUserConf == null) {
finalConf = hadoopConf;
}
else {
finalConf = new org.apache.hadoop.conf.Configuration(hadoopConf);

for (String key : extraUserConf.keySet()) {
finalConf.set(key, extraUserConf.getString(key, null));
}
}

// we explicitly re-instantiate the file system here in order to make sure
// that the configuration is applied.

URI fsUri = path.toUri();
final String scheme = fsUri.getScheme();
final String authority = fsUri.getAuthority();

if (scheme == null && authority == null) {
fsUri = FileSystem.getDefaultUri(finalConf);
}
else if (scheme != null && authority == null) {
URI defaultUri = FileSystem.getDefaultUri(finalConf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
fsUri = defaultUri;
}
}

final Class<? extends FileSystem> fsClass = FileSystem.getFileSystemClass(fsUri.getScheme(), finalConf);
final FileSystem fs;
try {
fs = fsClass.newInstance();
}
catch (Exception e) {
throw new IOException("Cannot instantiate the Hadoop file system", e);
}

fs.initialize(fsUri, finalConf);
return fs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

package org.apache.flink.runtime.fs.hdfs;

import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.HadoopUtils;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -116,6 +114,7 @@ public HadoopDataInputStream open(final Path f) throws IOException {
}

@Override
@SuppressWarnings("deprecation")
public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
Expand Down Expand Up @@ -169,27 +168,4 @@ public long getDefaultBlockSize() {
public boolean isDistributedFS() {
return true;
}

// ------------------------------------------------------------------------
// Miscellaneous Utilities
// ------------------------------------------------------------------------

/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured
* in the main configuration (flink-conf.yaml).
* This method is public because its being used in the HadoopDataSource.
*
* @deprecated This method should not be used, because it dynamically (and possibly incorrectly)
* re-loads the Flink configuration.
* Use {@link HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)}
* instead.
*/
@Deprecated
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {

org.apache.flink.configuration.Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration();

return HadoopUtils.getHadoopConfiguration(flinkConfiguration);
}
}

0 comments on commit bad3df5

Please sign in to comment.