Skip to content

Commit

Permalink
[FLINK-28112][filesystems] Fix error message when directly supported …
Browse files Browse the repository at this point in the history
…file system not able to be handled
  • Loading branch information
gaborgsomogyi authored and MartijnVisser committed Jun 22, 2022
1 parent 97e5914 commit 5bcef81
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 67 deletions.
38 changes: 25 additions & 13 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,26 +514,38 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
throw new UnsupportedFileSystemSchemeException(
String.format(
"Could not find a file system implementation for scheme '%s'. The scheme is "
+ "directly supported by Flink through the following plugin%s: %s. Please ensure that each "
+ "plugin resides within its own subfolder within the plugins directory. See https://ci.apache"
+ ".org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to "
+ "directly supported by Flink through the following plugin(s): %s. Please ensure that each "
+ "plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache"
+ ".org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to "
+ "use a Hadoop file system for that scheme, please add the scheme to the configuration fs"
+ ".allowed-fallback-filesystems. For a full list of supported file systems, "
+ "please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.",
uri.getScheme(),
plugins.size() == 1 ? "" : "s",
String.join(", ", plugins)));
uri.getScheme(), String.join(", ", plugins)));
} else {
try {
fs = FALLBACK_FACTORY.create(uri);
} catch (UnsupportedFileSystemSchemeException e) {
throw new UnsupportedFileSystemSchemeException(
"Could not find a file system implementation for scheme '"
+ uri.getScheme()
+ "'. The scheme is not directly supported by Flink and no Hadoop file system to "
+ "support this scheme could be loaded. For a full list of supported file systems, "
+ "please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.",
e);
if (DIRECTLY_SUPPORTED_FILESYSTEM.containsKey(uri.getScheme())) {
final Collection<String> plugins =
DIRECTLY_SUPPORTED_FILESYSTEM.get(uri.getScheme());
throw new UnsupportedFileSystemSchemeException(
String.format(
"Could not find a file system implementation for scheme '%s'. File system schemes "
+ "are supported by Flink through the following plugin(s): %s. "
+ "No file system to support this scheme could be loaded. Please ensure that each plugin is "
+ "configured properly and resides within its own subfolder in the plugins directory. "
+ "See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ "
+ "for more information.",
uri.getScheme(), String.join(", ", plugins)));
} else {
throw new UnsupportedFileSystemSchemeException(
"Could not find a file system implementation for scheme '"
+ uri.getScheme()
+ "'. The scheme is not directly supported by Flink and no Hadoop file system to "
+ "support this scheme could be loaded. For a full list of supported file systems, "
+ "please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.",
e);
}
}
}

Expand Down
103 changes: 49 additions & 54 deletions flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,100 +23,95 @@
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.WrappingProxy;
import org.apache.flink.util.WrappingProxyUtil;
import org.apache.flink.util.function.ThrowingRunnable;

import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link FileSystem} base class. */
public class FileSystemTest {
class FileSystemTest {

@Test
public void testGet() throws URISyntaxException, IOException {
void testGet() throws URISyntaxException, IOException {
String scheme = "file";

assertTrue(
getFileSystemWithoutSafetyNet(scheme + ":https:///test/test") instanceof LocalFileSystem);
assertThat(getFileSystemWithoutSafetyNet(scheme + ":https:///test/test"))
.isInstanceOf(LocalFileSystem.class);

try {
getFileSystemWithoutSafetyNet(scheme + ":https://test/test");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
assertThat(ioe.getMessage()).startsWith("Found local file path with authority '");
}

assertTrue(
getFileSystemWithoutSafetyNet(scheme + ":/test/test") instanceof LocalFileSystem);
assertThat(getFileSystemWithoutSafetyNet(scheme + ":/test/test"))
.isInstanceOf(LocalFileSystem.class);

assertTrue(getFileSystemWithoutSafetyNet(scheme + ":test/test") instanceof LocalFileSystem);
assertThat(getFileSystemWithoutSafetyNet(scheme + ":test/test"))
.isInstanceOf(LocalFileSystem.class);

assertTrue(getFileSystemWithoutSafetyNet("/test/test") instanceof LocalFileSystem);
assertThat(getFileSystemWithoutSafetyNet("/test/test")).isInstanceOf(LocalFileSystem.class);

assertTrue(getFileSystemWithoutSafetyNet("test/test") instanceof LocalFileSystem);
assertThat(getFileSystemWithoutSafetyNet("test/test")).isInstanceOf(LocalFileSystem.class);
}

@Test
public void testUnsupportedFS() throws Exception {
Exception e = assertThatCode(() -> getFileSystemWithoutSafetyNet("unknownfs:https://authority/"));
assertThat(e, Matchers.instanceOf(UnsupportedFileSystemSchemeException.class));
void testUnsupportedFS() {
/*
exception should be:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation
for scheme 'unknownfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. */
assertThatThrownBy(() -> getFileSystemWithoutSafetyNet("unknownfs:https://authority/"))
.isInstanceOf(UnsupportedFileSystemSchemeException.class)
.hasMessageContaining("not directly supported")
.hasMessageContaining("no Hadoop file system to support this scheme");
}

@Test
public void testKnownFSWithoutPlugins() throws Exception {
Exception e = assertThatCode(() -> getFileSystemWithoutSafetyNet("s3:https://authority/"));
assertThat(e, Matchers.instanceOf(UnsupportedFileSystemSchemeException.class));
void testKnownFSWithoutPlugins() {
/*
exception should be:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
exception should be:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 's3'. The scheme is directly supported by Flink through the following
plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own
subfolder within the plugins directory. See https://ci.apache
.org/projects/flink/flink-docs-master/ops/plugins.html for more information. */
assertThat(e.getMessage(), not(containsString("not directly supported")));
assertThat(e.getMessage(), containsString("flink-s3-fs-hadoop"));
assertThat(e.getMessage(), containsString("flink-s3-fs-presto"));
subfolder within the plugins directory.
See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. */
assertThatThrownBy(() -> getFileSystemWithoutSafetyNet("s3:https://authority/"))
.isInstanceOf(UnsupportedFileSystemSchemeException.class)
.hasMessageContaining("is directly supported")
.hasMessageContaining("flink-s3-fs-hadoop")
.hasMessageContaining("flink-s3-fs-presto")
.hasMessageNotContaining("no Hadoop file system to support this scheme");
}

@Test
public void testKnownFSWithoutPluginsAndException() throws Exception {
void testKnownFSWithoutPluginsAndException() {
try {
final Configuration config = new Configuration();
config.set(CoreOptions.ALLOWED_FALLBACK_FILESYSTEMS, "s3;wasb");
FileSystem.initialize(config);
FileSystem.initialize(config, null);

Exception e = assertThatCode(() -> getFileSystemWithoutSafetyNet("s3:https://authority/"));
assertThat(e, Matchers.instanceOf(UnsupportedFileSystemSchemeException.class));
/*
exception should be:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation
for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. */
assertThat(e.getMessage(), containsString("not directly supported"));
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 's3'. File system schemes are supported by Flink through the following
plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. No file system to support this scheme could be loaded.
Please ensure that each plugin is configured properly and resides within its own subfolder in the plugins directory.
See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. */
assertThatThrownBy(() -> getFileSystemWithoutSafetyNet("s3:https://authority/"))
.isInstanceOf(UnsupportedFileSystemSchemeException.class)
.hasMessageContaining("File system schemes are supported")
.hasMessageContaining("flink-s3-fs-hadoop")
.hasMessageContaining("flink-s3-fs-presto")
.hasMessageContaining("Please ensure that each plugin is configured properly");
} finally {
FileSystem.initialize(new Configuration());
}
}

private static <E extends Throwable> E assertThatCode(ThrowingRunnable<E> runnable) throws E {
try {
runnable.run();
fail("No exception thrown");
return null;
} catch (Throwable e) {
try {
return (E) e;
} catch (ClassCastException c) {
throw e;
}
FileSystem.initialize(new Configuration(), null);
}
}

Expand Down

0 comments on commit 5bcef81

Please sign in to comment.