Skip to content

Commit

Permalink
[FLINK-8120] [flip6] Register Yarn application with correct tracking URL
Browse files Browse the repository at this point in the history
The cluster entrypoints start the ResourceManager with the web interface URL.
This URL is used to set the correct tracking URL in Yarn when registering the
Yarn application.

This closes apache#5128.
  • Loading branch information
tillrohrmann committed Dec 14, 2017
1 parent e80dd8e commit 627bcda
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -124,13 +126,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

/**
* Entry point for Mesos session clusters.
*/
Expand Down Expand Up @@ -114,13 +116,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
null);

jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);

Expand Down Expand Up @@ -272,7 +273,8 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;

protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
dispatcherRestEndpoint.getRestAddress());

dispatcher = createDispatcher(
configuration,
Expand Down Expand Up @@ -238,5 +239,6 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;

import javax.annotation.Nullable;

/**
* Entry point for the standalone session cluster.
*/
Expand All @@ -52,7 +54,8 @@ protected ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -100,6 +102,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme

private final YarnConfiguration yarnConfig;

@Nullable
private final String webInterfaceUrl;

/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;

Expand All @@ -123,7 +128,8 @@ public YarnResourceManager(
SlotManager slotManager,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) {
super(
rpcService,
resourceManagerEndpointId,
Expand Down Expand Up @@ -153,36 +159,50 @@ public YarnResourceManager(
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
numPendingContainerRequests = 0;

this.webInterfaceUrl = webInterfaceUrl;
}

protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
AMRMClientAsync<AMRMClient.ContainerRequest> rmc = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
rmc.init(yarnConfig);
rmc.start();
try {
//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
//TODO: the third paramter should be the webmonitor address
rmc.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
} catch (Exception e) {
log.info("registerApplicationMaster fail", e);
}
return rmc;
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
YarnConfiguration yarnConfiguration,
int yarnHeartbeatIntervalMillis,
@Nullable String webInterfaceUrl) throws Exception {
AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
yarnHeartbeatIntervalMillis,
this);

resourceManagerClient.init(yarnConfiguration);
resourceManagerClient.start();

//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
Tuple2<String, Integer> hostPort = parseHostPort(getAddress());

resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, webInterfaceUrl);

return resourceManagerClient;
}

protected NMClient createAndStartNodeManagerClient() {
protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
// create the client to communicate with the node managers
NMClient nmc = NMClient.createNMClient();
nmc.init(yarnConfig);
nmc.start();
nmc.cleanupRunningContainersOnStop(true);
return nmc;
NMClient nodeManagerClient = NMClient.createNMClient();
nodeManagerClient.init(yarnConfiguration);
nodeManagerClient.start();
nodeManagerClient.cleanupRunningContainersOnStop(true);
return nodeManagerClient;
}

@Override
protected void initialize() throws ResourceManagerException {
resourceManagerClient = createAndStartResourceManagerClient();
nodeManagerClient = createAndStartNodeManagerClient();
try {
resourceManagerClient = createAndStartResourceManagerClient(
yarnConfig,
yarnHeartbeatIntervalMillis,
webInterfaceUrl);
} catch (Exception e) {
throw new ResourceManagerException("Could not start resource manager client.", e);
}

nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
}

@Override
Expand Down Expand Up @@ -222,7 +242,8 @@ protected void shutDownApplication(ApplicationStatus finalStatus, String optiona

// first, de-register from YARN
FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
log.info("Unregistering application from the YARN Resource Manager");
log.info("Unregister application from the YARN Resource Manager");

try {
resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import org.apache.hadoop.yarn.api.ApplicationConstants;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -79,7 +81,8 @@ protected ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand All @@ -99,7 +102,8 @@ protected ResourceManager<?> createResourceManager(
rmRuntimeServices.getSlotManager(),
metricRegistry,
rmRuntimeServices.getJobLeaderIdService(),
fatalErrorHandler);
fatalErrorHandler,
webInterfaceUrl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import org.apache.hadoop.yarn.api.ApplicationConstants;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -69,7 +71,8 @@ protected ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand All @@ -89,7 +92,8 @@ protected ResourceManager<?> createResourceManager(
rmRuntimeServices.getSlotManager(),
metricRegistry,
rmRuntimeServices.getJobLeaderIdService(),
fatalErrorHandler);
fatalErrorHandler,
webInterfaceUrl);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -73,6 +74,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -149,11 +152,23 @@ public TestingYarnResourceManager(
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl,
AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient,
NMClient mockNMClient) {
super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env,
resourceManagerConfiguration, highAvailabilityServices, heartbeatServices,
slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
super(
rpcService,
resourceManagerEndpointId,
resourceId,
flinkConfig,
env,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
metricRegistry,
jobLeaderIdService,
fatalErrorHandler,
webInterfaceUrl);
this.mockNMClient = mockNMClient;
this.mockResourceManagerClient = mockResourceManagerClient;
}
Expand All @@ -167,12 +182,15 @@ public MainThreadExecutor getMainThreadExecutorForTesting() {
}

@Override
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
YarnConfiguration yarnConfiguration,
int yarnHeartbeatIntervalMillis,
@Nullable String webInteraceUrl) {
return mockResourceManagerClient;
}

@Override
protected NMClient createAndStartNodeManagerClient() {
protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
return mockNMClient;
}
}
Expand Down Expand Up @@ -231,9 +249,9 @@ static class Context {
rmServices.metricRegistry,
rmServices.jobLeaderIdService,
fatalErrorHandler,
null,
mockResourceManagerClient,
mockNMClient
);
mockNMClient);
}

/**
Expand Down

0 comments on commit 627bcda

Please sign in to comment.