diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 7c69425ec4f20..672ebbbfbf5ca 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -406,17 +406,6 @@ public static FileSystem get(URI uri) throws IOException { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } - /** - * Returns a boolean indicating whether a scheme has built-in Flink support. - * - * @param scheme - * a file system scheme - * @return a boolean indicating whether the provided scheme has built-in Flink support - */ - public static boolean isFlinkSupportedScheme(String scheme) { - return FSDIRECTORY.containsKey(scheme); - } - //Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'. private static FileSystem instantiateHadoopFileSystemWrapper(Class wrappedFileSystem) throws IOException { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index e320bf3140de7..ddfa85c97ed63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -31,12 +31,9 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -53,8 +50,6 @@ public class FsStateBackend extends AbstractStateBackend { private static final long serialVersionUID = -8191916350224044011L; - private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class); - /** By default, state smaller than 1024 bytes will not be written to files, but * will be stored directly with the metadata */ public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024; @@ -341,7 +336,7 @@ public String toString() { * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. * @throws IOException Thrown, if no file system can be found for the URI's scheme. */ - public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { + private static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { final String scheme = checkpointDataUri.getScheme(); final String path = checkpointDataUri.getPath(); @@ -358,41 +353,6 @@ public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOExcep throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); } - if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) { - // skip verification checks for non-flink supported filesystem - // this is because the required filesystem classes may not be available to the flink client - return new Path(checkpointDataUri); - } else { - // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same - // (distributed) filesystem on all hosts and includes full host/port information, even if the - // original URI did not include that. We count on the filesystem loading from the configuration - // to fill in the missing data. - - // try to grab the file system for this path/URI - FileSystem filesystem = FileSystem.get(checkpointDataUri); - if (filesystem == null) { - String reason = "Could not find a file system for the given scheme in" + - "the available configurations."; - LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + - "problem or by the fact that the file system is not accessible from the " + - "client. Reason:{}", reason); - return new Path(checkpointDataUri); - } - - URI fsURI = filesystem.getUri(); - try { - URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); - return new Path(baseURI); - } catch (URISyntaxException e) { - String reason = String.format( - "Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(), - checkpointDataUri, - fsURI); - LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + - "problem or by the fact that the file system is not accessible from the " + - "client. Reason: {}", reason); - return new Path(checkpointDataUri); - } - } + return new Path(checkpointDataUri); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index c963238cfee59..7a10d01da0e39 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -291,11 +291,12 @@ class MemoryArchivist( throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") } - if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { - // skip verification checks for non-flink supported filesystem - // this is because the required filesystem classes may not be available to the flink client - throw new IllegalArgumentException("No file system found with scheme " + scheme - + ", referenced in file URI '" + archivePathUri.toString + "'.") + try { + FileSystem.get(archivePathUri) + } + catch { + case e: Exception => + throw new IllegalArgumentException(s"No file system found for URI '${archivePathUri}'.") } new Path(archivePathUri) }