Skip to content

Commit

Permalink
[FLINK-1266] Properly pass the fs.defaulFS setting when initializing …
Browse files Browse the repository at this point in the history
…filesystems
  • Loading branch information
rmetzger committed Jan 8, 2015
1 parent 0af4d3a commit c024d81
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ public URI getUri() {
@Override
public void initialize(URI path) throws IOException {

// For HDFS we have to have an authority
if (path.getAuthority() == null && path.getScheme().equals("hdfs")) {
// If the authority is not part of the path, we initialize with the fs.defaultFS entry.
if (path.getAuthority() == null) {

String configEntry = this.conf.get("fs.defaultFS", null);
if (configEntry == null) {
Expand All @@ -277,31 +277,29 @@ public void initialize(URI path) throws IOException {
}

if (configEntry == null) {
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
"or that configuration did not contain an entry for the default hdfs.");
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system (hdfs) configuration was registered, " +
"or that configuration did not contain an entry for the default file system (usually 'fs.defaultFS').");
} else {
try {
URI initURI = URI.create(configEntry);

if (initURI.getAuthority() == null) {
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
"or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port.");
} else if (!initURI.getScheme().equalsIgnoreCase("hdfs")) {
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " +
"or the provided configuration describes a file system with scheme '" + initURI.getScheme() + "' other than the Hadoop Distributed File System (HDFS).");
throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system was registered, " +
"or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) " +
"describing the (hdfs namenode) host and port.");
} else {
try {
this.fs.initialize(initURI, this.conf);
}
catch (IOException e) {
throw new IOException(getMissingAuthorityErrorPrefix(path) +
"Could not initialize the file system connection with the given address of the HDFS NameNode: " + e.getMessage(), e);
"Could not initialize the file system connection with the given default file system address: " + e.getMessage(), e);
}
}
}
catch (IllegalArgumentException e) {
throw new IOException(getMissingAuthorityErrorPrefix(path) +
"The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + configEntry);
"The configuration contains an invalid file system default name (fs.default.name or fs.defaultFS): " + configEntry);
}
}
}
Expand Down

0 comments on commit c024d81

Please sign in to comment.