Skip to content

Commit

Permalink
[FLINK-14481]Modify the Flink valid socket port check to 0 to 65535. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
AT-Fieldless authored and pnowojski committed Nov 22, 2019
1 parent f01cdd1 commit 6dbb308
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 12 deletions.
28 changes: 24 additions & 4 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public static String unresolvedHostToNormalizedString(String host) {
* @return host:port where host will be normalized if it is an IPv6 address
*/
public static String unresolvedHostAndPortToNormalizedString(String host, int port) {
Preconditions.checkArgument(port >= 0 && port < 65536,
Preconditions.checkArgument(isValidHostPort(port),
"Port is not within the valid range,");
return unresolvedHostToNormalizedString(host) + ":" + port;
}
Expand Down Expand Up @@ -350,20 +350,20 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) t
if (dashIdx == -1) {
// only one port in range:
final int port = Integer.valueOf(range);
if (port < 0 || port > 65535) {
if (!isValidHostPort(port)) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + port + ".");
}
rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
} else {
// evaluate range
final int start = Integer.valueOf(range.substring(0, dashIdx));
if (start < 0 || start > 65535) {
if (!isValidHostPort(start)) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + start + ".");
}
final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
if (end < 0 || end > 65535) {
if (!isValidHostPort(end)) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + end + ".");
}
Expand Down Expand Up @@ -430,4 +430,24 @@ public static String getWildcardIPAddress() {
public interface SocketFactory {
ServerSocket createSocket(int port) throws IOException;
}

/**
* Check whether the given port is in right range when connecting to somewhere.
*
* @param port the port to check
* @return true if the number in the range 1 to 65535
*/
public static boolean isValidClientPort(int port) {
return 1 <= port && port <= 65535;
}

/**
* check whether the given port is in right range when getting port from local system.
*
* @param port the port to check
* @return true if the number in the range 0 to 65535
*/
public static boolean isValidHostPort(int port) {
return 0 <= port && port <= 65535;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.util.NetUtils;

import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
Expand Down Expand Up @@ -71,7 +72,7 @@ public InfluxdbReporter() {
public void open(MetricConfig config) {
String host = getString(config, HOST);
int port = getInteger(config, PORT);
if (!isValidHost(host) || !isValidPort(port)) {
if (!isValidHost(host) || !NetUtils.isValidClientPort(port)) {
throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
}
String database = getString(config, DB);
Expand Down Expand Up @@ -153,7 +154,4 @@ private static boolean isValidHost(String host) {
return host != null && !host.isEmpty();
}

private static boolean isValidPort(int port) {
return 0 < port && port <= 65535;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -120,8 +121,8 @@ public QueryableStateClient(final String remoteHostname, final int remotePort) t
* @param remotePort the port of the proxy to connect to.
*/
public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) {
Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536,
"Remote Port " + remotePort + " is out of valid port range (0-65536).");
Preconditions.checkArgument(NetUtils.isValidHostPort(remotePort),
"Remote Port " + remotePort + " is out of valid port range [0-65535].");

this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.net.SSLUtils;

import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,7 +64,7 @@ public NettyConfig(

this.serverAddress = checkNotNull(serverAddress);

checkArgument(serverPort >= 0 && serverPort <= 65535, "Invalid port number.");
checkArgument(NetUtils.isValidHostPort(serverPort), "Invalid port number.");
this.serverPort = serverPort;

checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -229,7 +230,7 @@ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extend
Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion) throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
Preconditions.checkArgument(NetUtils.isValidHostPort(targetPort), "The target port " + targetPort + " is not in the range [0, 65535].");
Preconditions.checkNotNull(messageHeaders);
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(messageParameters);
Expand Down

0 comments on commit 6dbb308

Please sign in to comment.