Skip to content

Commit

Permalink
[FLINK-7648] [flip6] Add TaskManagersHandler
Browse files Browse the repository at this point in the history
Send dataPort and HardwareDescription to RM

Instantiate RM leader retriever
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent eddb5b0 commit def8781
Show file tree
Hide file tree
Showing 20 changed files with 749 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
Expand Down Expand Up @@ -644,9 +645,11 @@ public void testWorkerStarted() throws Exception {
startResourceManager();
assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));

final int dataPort = 1234;
final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
// send registration message
CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, timeout);
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, dataPort, hardwareDescription, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
Expand Down Expand Up @@ -59,6 +61,7 @@
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
Expand Down Expand Up @@ -87,6 +90,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Configuration clusterConfiguration;
private final RestHandlerConfiguration restConfiguration;
private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
private final Executor executor;

private final ExecutionGraphCache executionGraphCache;
Expand All @@ -97,11 +101,13 @@ public DispatcherRestEndpoint(
GatewayRetriever<DispatcherGateway> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
Executor executor) {
super(endpointConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);
this.executor = Preconditions.checkNotNull(executor);

this.executionGraphCache = new ExecutionGraphCache(
Expand Down Expand Up @@ -223,7 +229,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executionGraphCache,
executor,
checkpointStatsCache);

JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
restAddressFuture,
leaderRetriever,
Expand All @@ -232,7 +238,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
JobExceptionsHeaders.getInstance(),
executionGraphCache,
executor);

JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
restAddressFuture,
leaderRetriever,
Expand All @@ -254,6 +260,14 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
timeout,
responseHeaders);

TaskManagersHandler<DispatcherGateway> taskManagersHandler = new TaskManagersHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
TaskManagersHeaders.getInstance(),
resourceManagerRetriever);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down Expand Up @@ -284,6 +298,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));

// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand All @@ -56,6 +57,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {

private LeaderRetrievalService dispatcherLeaderRetrievalService;

private LeaderRetrievalService resourceManagerRetrievalService;

private DispatcherRestEndpoint dispatcherRestEndpoint;

public SessionClusterEntrypoint(Configuration configuration) {
Expand All @@ -73,16 +76,26 @@ protected void startClusterComponents(

dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();

resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();

LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::new,
10,
Time.milliseconds(50L));

LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::new,
10,
Time.milliseconds(50L));

dispatcherRestEndpoint = createDispatcherRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
rpcService.getExecutor());

LOG.debug("Starting Dispatcher REST endpoint.");
Expand Down Expand Up @@ -110,6 +123,7 @@ protected void startClusterComponents(

LOG.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

LOG.debug("Starting Dispatcher.");
dispatcher.start();
Expand Down Expand Up @@ -140,6 +154,14 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
}
}

if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}

if (resourceManager != null) {
try {
resourceManager.shutDown();
Expand All @@ -156,13 +178,15 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
protected DispatcherRestEndpoint createDispatcherRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
Executor executor) throws Exception {

return new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import org.apache.flink.runtime.util.Hardware;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Objects;

/**
* A hardware description describes the resources available to a task manager.
Expand All @@ -29,16 +33,28 @@ public final class HardwareDescription implements Serializable {

private static final long serialVersionUID = 3380016608300325361L;

public static final String FIELD_NAME_CPU_CORES = "cpuCores";

public static final String FIELD_NAME_SIZE_PHYSICAL_MEMORY = "physicalMemory";

public static final String FIELD_NAME_SIZE_JVM_HEAP = "freeMemory";

public static final String FIELD_NAME_SIZE_MANAGED_MEMORY = "managedMemory";

/** The number of CPU cores available to the JVM on the compute node. */
@JsonProperty(FIELD_NAME_CPU_CORES)
private final int numberOfCPUCores;

/** The size of physical memory in bytes available on the compute node. */
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY)
private final long sizeOfPhysicalMemory;

/** The size of the JVM heap memory */
@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP)
private final long sizeOfJvmHeap;

/** The size of the memory managed by the system for caching, hashing, sorting, ... */
@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY)
private final long sizeOfManagedMemory;

/**
Expand All @@ -49,7 +65,12 @@ public final class HardwareDescription implements Serializable {
* @param sizeOfJvmHeap The size of the JVM heap memory.
* @param sizeOfManagedMemory The size of the memory managed by the system for caching, hashing, sorting, ...
*/
public HardwareDescription(int numberOfCPUCores, long sizeOfPhysicalMemory, long sizeOfJvmHeap, long sizeOfManagedMemory) {
@JsonCreator
public HardwareDescription(
@JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores,
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory,
@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap,
@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) {
this.numberOfCPUCores = numberOfCPUCores;
this.sizeOfPhysicalMemory = sizeOfPhysicalMemory;
this.sizeOfJvmHeap = sizeOfJvmHeap;
Expand Down Expand Up @@ -96,6 +117,27 @@ public long getSizeOfManagedMemory() {
// Utils
// --------------------------------------------------------------------------------------------


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HardwareDescription that = (HardwareDescription) o;
return numberOfCPUCores == that.numberOfCPUCores &&
sizeOfPhysicalMemory == that.sizeOfPhysicalMemory &&
sizeOfJvmHeap == that.sizeOfJvmHeap &&
sizeOfManagedMemory == that.sizeOfManagedMemory;
}

@Override
public int hashCode() {
return Objects.hash(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
}

@Override
public String toString() {
return String.format("cores=%d, physMem=%d, heap=%d, managed=%d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
Expand All @@ -51,6 +52,7 @@
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -330,6 +332,8 @@ public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final SlotReport slotReport,
final int dataPort,
final HardwareDescription hardwareDescription,
final Time timeout) {

CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
Expand All @@ -343,7 +347,9 @@ public CompletableFuture<RegistrationResponse> registerTaskExecutor(
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
slotReport);
slotReport,
dataPort,
hardwareDescription);
}
},
getMainThreadExecutor());
Expand Down Expand Up @@ -485,6 +491,28 @@ public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
return CompletableFuture.completedFuture(taskExecutors.size());
}

@Override
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {

ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());

for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();

taskManagerInfos.add(
new TaskManagerInfo(
taskExecutor.getInstanceID(),
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription()));
}

return CompletableFuture.completedFuture(taskManagerInfos);
}

@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
Expand Down Expand Up @@ -588,13 +616,17 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
* @param taskExecutorAddress address of the TaskExecutor
* @param taskExecutorResourceId ResourceID of the TaskExecutor
* @param slotReport initial slot report from the TaskExecutor
* @param dataPort port used for data transfer
* @param hardwareDescription of the registering TaskExecutor
* @return RegistrationResponse
*/
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
String taskExecutorAddress,
ResourceID taskExecutorResourceId,
SlotReport slotReport) {
TaskExecutorGateway taskExecutorGateway,
String taskExecutorAddress,
ResourceID taskExecutorResourceId,
SlotReport slotReport,
int dataPort,
HardwareDescription hardwareDescription) {
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
Expand All @@ -612,7 +644,7 @@ private RegistrationResponse registerTaskExecutorInternal(
return new RegistrationResponse.Decline("unrecognized TaskExecutor");
} else {
WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(taskExecutorGateway, newWorker);
new WorkerRegistration<>(taskExecutorGateway, newWorker, dataPort, hardwareDescription);

taskExecutors.put(taskExecutorResourceId, registration);

Expand Down
Loading

0 comments on commit def8781

Please sign in to comment.