Skip to content

Commit

Permalink
[FLINK-7643] [core] Drop eager checks for file system support.
Browse files Browse the repository at this point in the history
Some places validate if the file URIs are resolvable on the client. This leads to
problems when file systems are not accessible from the client, when the full libraries for
the file systems are not present on the client (for example often the case in cloud setups),
or when the configuration on the client is different from the nodes/containers that will
execute the application.
  • Loading branch information
StephanEwen committed Oct 5, 2017
1 parent 3b78684 commit a5ef09b
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 58 deletions.
11 changes: 0 additions & 11 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,6 @@ public static FileSystem get(URI uri) throws IOException {
return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
}

/**
* Returns a boolean indicating whether a scheme has built-in Flink support.
*
* @param scheme
* a file system scheme
* @return a boolean indicating whether the provided scheme has built-in Flink support
*/
public static boolean isFlinkSupportedScheme(String scheme) {
return FSDIRECTORY.containsKey(scheme);
}

//Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'.
private static FileSystem instantiateHadoopFileSystemWrapper(Class<?> wrappedFileSystem) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -53,8 +50,6 @@ public class FsStateBackend extends AbstractStateBackend {

private static final long serialVersionUID = -8191916350224044011L;

private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);

/** By default, state smaller than 1024 bytes will not be written to files, but
* will be stored directly with the metadata */
public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
Expand Down Expand Up @@ -341,7 +336,7 @@ public String toString() {
* @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
* @throws IOException Thrown, if no file system can be found for the URI's scheme.
*/
public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
private static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
final String scheme = checkpointDataUri.getScheme();
final String path = checkpointDataUri.getPath();

Expand All @@ -358,41 +353,6 @@ public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOExcep
throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
}

if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) {
// skip verification checks for non-flink supported filesystem
// this is because the required filesystem classes may not be available to the flink client
return new Path(checkpointDataUri);
} else {
// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
// (distributed) filesystem on all hosts and includes full host/port information, even if the
// original URI did not include that. We count on the filesystem loading from the configuration
// to fill in the missing data.

// try to grab the file system for this path/URI
FileSystem filesystem = FileSystem.get(checkpointDataUri);
if (filesystem == null) {
String reason = "Could not find a file system for the given scheme in" +
"the available configurations.";
LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
"problem or by the fact that the file system is not accessible from the " +
"client. Reason:{}", reason);
return new Path(checkpointDataUri);
}

URI fsURI = filesystem.getUri();
try {
URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
return new Path(baseURI);
} catch (URISyntaxException e) {
String reason = String.format(
"Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(),
checkpointDataUri,
fsURI);
LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " +
"problem or by the fact that the file system is not accessible from the " +
"client. Reason: {}", reason);
return new Path(checkpointDataUri);
}
}
return new Path(checkpointDataUri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,12 @@ class MemoryArchivist(
throw new IllegalArgumentException("Cannot use the root directory for storing job archives.")
}

if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
// skip verification checks for non-flink supported filesystem
// this is because the required filesystem classes may not be available to the flink client
throw new IllegalArgumentException("No file system found with scheme " + scheme
+ ", referenced in file URI '" + archivePathUri.toString + "'.")
try {
FileSystem.get(archivePathUri)
}
catch {
case e: Exception =>
throw new IllegalArgumentException(s"No file system found for URI '${archivePathUri}'.")
}
new Path(archivePathUri)
}
Expand Down

0 comments on commit a5ef09b

Please sign in to comment.