Skip to content

Commit

Permalink
[FLINK-21794][metrics] Support retrieving slot details via rest api
Browse files Browse the repository at this point in the history
This closes apache#15249
  • Loading branch information
KarmaGYZ authored and xintongsong committed Mar 24, 2021
1 parent ea0b91d commit 99c2a41
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 61 deletions.
16 changes: 16 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -3051,6 +3051,22 @@
}
}
},
"allocatedSlots" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:SlotInfo",
"properties" : {
"jobId" : {
"type" : "any"
},
"resource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
}
}
}
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Tim
}

@Override
public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(
public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(
ResourceID resourceId, Time timeout) {

final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(resourceId);
Expand All @@ -687,21 +687,23 @@ public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(
return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(resourceId));
} else {
final InstanceID instanceId = taskExecutor.getInstanceID();
final TaskManagerInfo taskManagerInfo =
new TaskManagerInfo(
resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskExecutor.getJmxPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
taskExecutor.getMemoryConfiguration());

return CompletableFuture.completedFuture(taskManagerInfo);
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
new TaskManagerInfoWithSlots(
new TaskManagerInfo(
resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskExecutor.getJmxPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
taskExecutor.getMemoryConfiguration()),
slotManager.getAllocatedSlotsOf(instanceId));

return CompletableFuture.completedFuture(taskManagerInfoWithSlots);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ void heartbeatFromTaskManager(
CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(@RpcTimeout Time timeout);

/**
* Requests information about the given {@link TaskExecutor}.
* Requests detail information about the given {@link TaskExecutor}.
*
* @param taskManagerId identifying the TaskExecutor for which to return information
* @param timeout of the request
* @return Future TaskManager information
* @return Future TaskManager information and its allocated slots
*/
CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(
CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(
ResourceID taskManagerId, @RpcTimeout Time timeout);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Collections;

/** Contains the base information about a {@link TaskExecutor} and its allocated slots. */
public class TaskManagerInfoWithSlots {

private final TaskManagerInfo taskManagerInfo;
private final Collection<SlotInfo> allocatedSlots;

public TaskManagerInfoWithSlots(
TaskManagerInfo taskManagerInfo, Collection<SlotInfo> allocatedSlots) {
this.taskManagerInfo = Preconditions.checkNotNull(taskManagerInfo);
this.allocatedSlots = Preconditions.checkNotNull(allocatedSlots);
}

public final Collection<SlotInfo> getAllocatedSlots() {
return Collections.unmodifiableCollection(allocatedSlots);
}

public TaskManagerInfo getTaskManagerInfo() {
return taskManagerInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
Expand Down Expand Up @@ -736,6 +737,12 @@ public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return taskExecutorManager.getTotalFreeResourcesOf(instanceID);
}

@Override
public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
// This information is currently not supported for this slot manager.
return Collections.emptyList();
}

@Override
public int getNumberPendingSlotRequests() {
// only exists for testing purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
Expand Down Expand Up @@ -636,6 +637,15 @@ public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return taskManagerTracker.getRegisteredResourceOf(instanceID);
}

@Override
public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
return taskManagerTracker.getRegisteredTaskManager(instanceID)
.map(TaskManagerInfo::getAllocatedSlots).map(Map::values)
.orElse(Collections.emptyList()).stream()
.map(slot -> new SlotInfo(slot.getJobId(), slot.getResourceProfile()))
.collect(Collectors.toList());
}

@Override
public int getNumberPendingSlotRequests() {
// only exists for testing purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -70,6 +72,8 @@ public interface SlotManager extends AutoCloseable {

ResourceProfile getFreeResourceOf(InstanceID instanceID);

Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID);

int getNumberPendingSlotRequests();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
Expand All @@ -53,6 +54,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -264,6 +266,12 @@ public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
.orElse(ResourceProfile.ZERO);
}

@Override
public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
// This information is currently not supported for this slot manager.
return Collections.emptyList();
}

@VisibleForTesting
public int getNumberPendingTaskManagerSlots() {
return pendingSlots.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
Expand All @@ -31,7 +32,6 @@
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
Expand Down Expand Up @@ -87,14 +87,14 @@ protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
final ResourceID taskManagerResourceId =
request.getPathParameter(TaskManagerIdPathParameter.class);

CompletableFuture<TaskManagerInfo> taskManagerInfoFuture =
gateway.requestTaskManagerInfo(taskManagerResourceId, timeout);
CompletableFuture<TaskManagerInfoWithSlots> taskManagerInfoWithSlotsFuture =
gateway.requestTaskManagerDetailsInfo(taskManagerResourceId, timeout);

metricFetcher.update();

return taskManagerInfoFuture
return taskManagerInfoWithSlotsFuture
.thenApply(
(TaskManagerInfo taskManagerInfo) -> {
(taskManagerInfoWithSlots) -> {
final MetricStore.TaskManagerMetricStore tmMetrics =
metricStore.getTaskManagerMetricStore(
taskManagerResourceId.getResourceIdString());
Expand All @@ -114,7 +114,7 @@ protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
}

return new TaskManagerDetailsInfo(
taskManagerInfo, taskManagerMetricsInfo);
taskManagerInfoWithSlots, taskManagerMetricsInfo);
})
.exceptionally(
(Throwable throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.rest.messages.ResourceProfileInfo;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.io.Serializable;
import java.util.Objects;

/**
* Class containing information for a slot of {@link
* org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlot}.
*/
public class SlotInfo implements ResponseBody, Serializable {
private static final long serialVersionUID = 1L;

public static final String FIELD_NAME_RESOURCE = "resource";

public static final String FIELD_NAME_JOB_ID = "jobId";

@JsonProperty(FIELD_NAME_RESOURCE)
private final ResourceProfileInfo resource;

@JsonProperty(FIELD_NAME_JOB_ID)
@JsonSerialize(using = JobIDSerializer.class)
private final JobID jobId;

@JsonCreator
public SlotInfo(
@JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID)
JobID jobId,
@JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource) {
this.jobId = Preconditions.checkNotNull(jobId);
this.resource = Preconditions.checkNotNull(resource);
}

public SlotInfo(JobID jobId, ResourceProfile resource) {
this(jobId, ResourceProfileInfo.fromResrouceProfile(resource));
}

@JsonIgnore
public JobID getJobId() {
return jobId;
}

@JsonIgnore
public ResourceProfileInfo getResource() {
return resource;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlotInfo that = (SlotInfo) o;
return Objects.equals(jobId, that.jobId) && Objects.equals(resource, that.resource);
}

@Override
public int hashCode() {
return Objects.hash(jobId, resource);
}
}
Loading

0 comments on commit 99c2a41

Please sign in to comment.