Skip to content

Commit

Permalink
[FLINK-15911][runtime] Support unresolveable external hostname.
Browse files Browse the repository at this point in the history
  • Loading branch information
xintongsong authored and tillrohrmann committed Mar 24, 2020
1 parent 3c99779 commit 9dc7a4a
Show file tree
Hide file tree
Showing 27 changed files with 262 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.slf4j.LoggerFactory;

import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -74,8 +73,8 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
/** The name of the server, useful for debugging. */
private final String serverName;

/** The {@link InetAddress address} to listen to. */
private final InetAddress bindAddress;
/** The address to listen to. */
private final String bindAddress;

/** A port range on which to try to connect. */
private final Set<Integer> bindPortRange;
Expand Down Expand Up @@ -113,7 +112,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
*/
protected AbstractServerBase(
final String serverName,
final InetAddress bindAddress,
final String bindAddress,
final Iterator<Integer> bindPortIterator,
final Integer numEventLoopThreads,
final Integer numQueryThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -57,7 +56,7 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
* Creates the Queryable State Client Proxy.
*
* <p>The server is instantiated using reflection by the
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(String, Iterator, int, int, KvStateRequestStats)
* QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}.
*
* <p>The server needs to be started via {@link #start()} in order to bind
Expand All @@ -70,7 +69,7 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
* @param stats the statistics collector.
*/
public KvStateClientProxyImpl(
final InetAddress bindAddress,
final String bindAddress,
final Iterator<Integer> bindPortIterator,
final Integer numEventLoopThreads,
final Integer numQueryThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.util.Preconditions;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
Expand All @@ -51,7 +50,7 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
* Creates the state server.
*
* <p>The server is instantiated using reflection by the
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
* {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(String, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
* QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}.
*
* <p>The server needs to be started via {@link #start()} in order to bind
Expand All @@ -65,7 +64,7 @@ public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest
* @param stats the statistics collector.
*/
public KvStateServerImpl(
final InetAddress bindAddress,
final String bindAddress,
final Iterator<Integer> bindPortIterator,
final Integer numEventLoopThreads,
final Integer numQueryThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class KvStateClientProxyImplTest extends TestLogger {
@Before
public void setup() {
kvStateClientProxy = new KvStateClientProxyImpl(
InetAddress.getLoopbackAddress(),
InetAddress.getLoopbackAddress().getHostName(),
Collections.singleton(0).iterator(),
1,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static class TestServer extends AbstractServerBase<TestMessage, TestMess
private final KvStateRequestStats requestStats;

TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
super(name, InetAddress.getLocalHost(), bindPort, 1, 1);
super(name, InetAddress.getLocalHost().getHostName(), bindPort, 1, 1);
this.requestStats = stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ public void testClientServerIntegration() throws Throwable {
registry[i] = new KvStateRegistry();
serverStats[i] = new AtomicKvStateRequestStats();
server[i] = new KvStateServerImpl(
InetAddress.getLocalHost(),
InetAddress.getLocalHost().getHostName(),
Collections.singletonList(0).iterator(),
numServerEventLoopThreads,
numServerQueryThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class KvStateServerHandlerTest extends TestLogger {
public static void setup() {
try {
testServer = new KvStateServerImpl(
InetAddress.getLocalHost(),
InetAddress.getLocalHost().getHostName(),
Collections.singletonList(0).iterator(),
1,
1,
Expand Down Expand Up @@ -413,7 +413,7 @@ public void testQueryExecutorShutDown() throws Throwable {
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();

KvStateServerImpl localTestServer = new KvStateServerImpl(
InetAddress.getLocalHost(),
InetAddress.getLocalHost().getHostName(),
Collections.singletonList(0).iterator(),
1,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testSimpleRequest() throws Throwable {
KvStateRequestStats stats = new AtomicKvStateRequestStats();

server = new KvStateServerImpl(
InetAddress.getLocalHost(),
InetAddress.getLocalHost().getHostName(),
Collections.singletonList(0).iterator(),
1,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -571,9 +572,21 @@ private void releaseEmptyTaskManager(ResourceID resourceId) {
@Override
public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
final Time timeout) {

final TaskManagerLocation taskManagerLocation;
try {
taskManagerLocation = TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
} catch (Throwable throwable) {
final String errMsg = String.format(
"Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s",
unresolvedTaskManagerLocation.getExternalAddress(),
throwable.getMessage());
log.error(errMsg);
return CompletableFuture.completedFuture(new RegistrationResponse.Decline(errMsg));
}

final ResourceID taskManagerId = taskManagerLocation.getResourceID();

if (registeredTaskManagers.containsKey(taskManagerId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -167,13 +167,13 @@ void failSlot(final ResourceID taskManagerId,
* Registers the task manager at the job manager.
*
* @param taskManagerRpcAddress the rpc address of the task manager
* @param taskManagerLocation location of the task manager
* @param unresolvedTaskManagerLocation unresolved location of the task manager
* @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or not
*/
CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
@RpcTimeout final Time timeout);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -54,7 +53,7 @@ public final class QueryableStateUtils {
* @return the {@link KvStateClientProxy client proxy}.
*/
public static KvStateClientProxy createKvStateClientProxy(
final InetAddress address,
final String address,
final Iterator<Integer> ports,
final int eventLoopThreads,
final int queryThreads,
Expand All @@ -70,7 +69,7 @@ public static KvStateClientProxy createKvStateClientProxy(
String classname = "org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl";
Class<? extends KvStateClientProxy> clazz = Class.forName(classname).asSubclass(KvStateClientProxy.class);
Constructor<? extends KvStateClientProxy> constructor = clazz.getConstructor(
InetAddress.class,
String.class,
Iterator.class,
Integer.class,
Integer.class,
Expand Down Expand Up @@ -108,7 +107,7 @@ public static KvStateClientProxy createKvStateClientProxy(
* @return the {@link KvStateServer state server}.
*/
public static KvStateServer createKvStateServer(
final InetAddress address,
final String address,
final Iterator<Integer> ports,
final int eventLoopThreads,
final int queryThreads,
Expand All @@ -126,7 +125,7 @@ public static KvStateServer createKvStateServer(
String classname = "org.apache.flink.queryablestate.server.KvStateServerImpl";
Class<? extends KvStateServer> clazz = Class.forName(classname).asSubclass(KvStateServer.class);
Constructor<? extends KvStateServer> constructor = clazz.getConstructor(
InetAddress.class,
String.class,
Iterator.class,
Integer.class,
Integer.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class JobLeaderService {
private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);

/** Self's location, used for the job manager connection. */
private final TaskManagerLocation ownLocation;
private final UnresolvedTaskManagerLocation ownLocation;

/** The leader retrieval service and listener for each registered job. */
private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
Expand All @@ -87,7 +87,7 @@ public class JobLeaderService {
private JobLeaderListener jobLeaderListener;

public JobLeaderService(
TaskManagerLocation location,
UnresolvedTaskManagerLocation location,
RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
this.ownLocation = Preconditions.checkNotNull(location);
this.retryingRegistrationConfiguration = Preconditions.checkNotNull(retryingRegistrationConfiguration);
Expand Down Expand Up @@ -418,7 +418,7 @@ private static final class JobManagerRetryingRegistration

private final String taskManagerRpcAddress;

private final TaskManagerLocation taskManagerLocation;
private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;

JobManagerRetryingRegistration(
Logger log,
Expand All @@ -429,7 +429,7 @@ private static final class JobManagerRetryingRegistration
JobMasterId jobMasterId,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
String taskManagerRpcAddress,
TaskManagerLocation taskManagerLocation) {
UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) {
super(
log,
rpcService,
Expand All @@ -440,15 +440,15 @@ private static final class JobManagerRetryingRegistration
retryingRegistrationConfiguration);

this.taskManagerRpcAddress = taskManagerRpcAddress;
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.unresolvedTaskManagerLocation = Preconditions.checkNotNull(unresolvedTaskManagerLocation);
}

@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
JobMasterGateway gateway,
JobMasterId fencingToken,
long timeoutMillis) {
return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
return gateway.registerTaskManager(taskManagerRpcAddress, unresolvedTaskManagerLocation, Time.milliseconds(timeoutMillis));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -183,7 +183,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// --------- TaskManager services --------

/** The connection information of this task manager. */
private final TaskManagerLocation taskManagerLocation;
private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;

private final TaskManagerMetricGroup taskManagerMetricGroup;

Expand Down Expand Up @@ -272,7 +272,7 @@ public TaskExecutor(
this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
this.jobManagerTable = taskExecutorServices.getJobManagerTable();
this.jobLeaderService = taskExecutorServices.getJobLeaderService();
this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation();
this.unresolvedTaskManagerLocation = taskExecutorServices.getUnresolvedTaskManagerLocation();
this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
this.kvStateService = taskExecutorServices.getKvStateService();
Expand All @@ -286,7 +286,7 @@ public TaskExecutor(
this.resourceManagerConnection = null;
this.currentRegistrationTimeoutId = null;

final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
final ResourceID resourceId = taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}
Expand Down Expand Up @@ -1044,7 +1044,7 @@ private void connectToResourceManager() {
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
taskManagerLocation.dataPort(),
unresolvedTaskManagerLocation.getDataPort(),
hardwareDescription,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
Expand Down Expand Up @@ -1686,7 +1686,7 @@ private CompletableFuture<TransientBlobKey> requestFileUploadByFilePath(String f
// ------------------------------------------------------------------------

public ResourceID getResourceID() {
return taskManagerLocation.getResourceID();
return unresolvedTaskManagerLocation.getResourceID();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
Expand Down Expand Up @@ -132,7 +131,7 @@ public TaskManagerRunner(Configuration configuration, ResourceID resourceId) thr
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

rpcService = createRpcService(configuration, highAvailabilityServices);

Expand Down Expand Up @@ -355,7 +354,7 @@ public static TaskExecutor startTaskManager(

LOG.info("Starting TaskManager with ResourceID: {}", resourceID);

InetAddress externalAddress = InetAddress.getByName(rpcService.getAddress());
String externalAddress = rpcService.getAddress();

final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

Expand All @@ -369,7 +368,7 @@ public static TaskExecutor startTaskManager(

Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
TaskManagerLocation.getHostName(externalAddress),
externalAddress,
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

Expand Down
Loading

0 comments on commit 9dc7a4a

Please sign in to comment.