Skip to content

Commit

Permalink
[FLINK-9683][history] HistoryServer uses configured default fs scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxin369 committed Feb 2, 2021
1 parent 0a27d11 commit ad0bcd7
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -202,8 +201,7 @@ public HistoryServer(
List<RefreshLocation> refreshDirs = new ArrayList<>();
for (String refreshDirectory : refreshDirectories.split(",")) {
try {
Path refreshPath =
WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
Path refreshPath = new Path(refreshDirectory);
FileSystem refreshFS = refreshPath.getFileSystem();
refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;

import java.util.concurrent.CompletableFuture;
Expand All @@ -45,9 +44,7 @@ static HistoryServerArchivist createHistoryServerArchivist(
final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);

if (configuredArchivePath != null) {
final Path archivePath =
WebMonitorUtils.validateAndNormalizeUri(
new Path(configuredArchivePath).toUri());
final Path archivePath = new Path(configuredArchivePath);

return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath, ioExecutor);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand All @@ -40,7 +39,6 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -222,33 +220,6 @@ public static Map<String, String> fromKeyValueJsonArray(String jsonString) {
}
}

/**
* Checks and normalizes the given URI. This method first checks the validity of the URI (scheme
* and path are not null) and then normalizes the URI to a path.
*
* @param archiveDirUri The URI to check and normalize.
* @return A normalized URI as a Path.
* @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
*/
public static Path validateAndNormalizeUri(URI archiveDirUri) {
final String scheme = archiveDirUri.getScheme();
final String path = archiveDirUri.getPath();

// some validity checks
if (scheme == null) {
throw new IllegalArgumentException(
"The scheme (hdfs:https://, file:https://, etc) is null. "
+ "Please specify the file system scheme explicitly in the URI.");
}
if (path == null) {
throw new IllegalArgumentException(
"The path to store the job archive data in is null. "
+ "Please specify a directory path for the archiving the job data.");
}

return new Path(archiveDirUri);
}

/** Private constructor to prevent instantiation. */
private WebMonitorUtils() {
throw new RuntimeException();
Expand Down

0 comments on commit ad0bcd7

Please sign in to comment.