Skip to content

Commit

Permalink
[FLINK-16440][runtime] Extend ResourceOverview and TaskManager(Detail…
Browse files Browse the repository at this point in the history
…s)Info for registered/free resources.

This closes apache#11363.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 27, 2020
1 parent e7e5e1d commit 44fde5d
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 4 deletions.
62 changes: 62 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,37 @@
"freeSlots" : {
"type" : "integer"
},
"totalResource" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
"properties" : {
"cpuCores" : {
"type" : "number"
},
"taskHeapMemory" : {
"type" : "integer"
},
"taskOffHeapMemory" : {
"type" : "integer"
},
"managedMemory" : {
"type" : "integer"
},
"networkMemory" : {
"type" : "integer"
},
"extendedResources" : {
"type" : "object",
"additionalProperties" : {
"type" : "number"
}
}
}
},
"freeResource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
Expand Down Expand Up @@ -2729,6 +2760,37 @@
"freeSlots" : {
"type" : "integer"
},
"totalResource" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
"properties" : {
"cpuCores" : {
"type" : "number"
},
"taskHeapMemory" : {
"type" : "integer"
},
"taskOffHeapMemory" : {
"type" : "integer"
},
"managedMemory" : {
"type" : "integer"
},
"networkMemory" : {
"type" : "integer"
},
"extendedResources" : {
"type" : "object",
"additionalProperties" : {
"type" : "number"
}
}
}
},
"freeResource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
Expand Down Expand Up @@ -535,6 +536,8 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Tim
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription()));
}

Expand All @@ -557,6 +560,8 @@ public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID reso
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription());

return CompletableFuture.completedFuture(taskManagerInfo);
Expand All @@ -567,12 +572,16 @@ public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID reso
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
final int numberFreeSlots = slotManager.getNumberFreeSlots();
final ResourceProfile totalResource = slotManager.getRegisteredResource();
final ResourceProfile freeResource = slotManager.getFreeResource();

return CompletableFuture.completedFuture(
new ResourceOverview(
taskExecutors.size(),
numberSlots,
numberFreeSlots));
numberFreeSlots,
totalResource,
freeResource));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;

import java.io.Serializable;

/**
Expand All @@ -27,18 +29,24 @@ public class ResourceOverview implements Serializable {

private static final long serialVersionUID = 7618746920569224557L;

private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = new ResourceOverview(0, 0, 0);
private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);

private final int numberTaskManagers;

private final int numberRegisteredSlots;

private final int numberFreeSlots;

public ResourceOverview(int numberTaskManagers, int numberRegisteredSlots, int numberFreeSlots) {
private final ResourceProfile totalResource;

private final ResourceProfile freeResource;

public ResourceOverview(int numberTaskManagers, int numberRegisteredSlots, int numberFreeSlots, ResourceProfile totalResource, ResourceProfile freeResource) {
this.numberTaskManagers = numberTaskManagers;
this.numberRegisteredSlots = numberRegisteredSlots;
this.numberFreeSlots = numberFreeSlots;
this.totalResource = totalResource;
this.freeResource = freeResource;
}

public int getNumberTaskManagers() {
Expand All @@ -53,6 +61,14 @@ public int getNumberFreeSlots() {
return numberFreeSlots;
}

public ResourceProfile getTotalResource() {
return totalResource;
}

public ResourceProfile getFreeResource() {
return freeResource;
}

public static ResourceOverview empty() {
return EMPTY_RESOURCE_OVERVIEW;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
Expand Down Expand Up @@ -59,6 +60,14 @@ public interface SlotManager extends AutoCloseable {
*/
Map<WorkerResourceSpec, Integer> getRequiredResources();

ResourceProfile getRegisteredResource();

ResourceProfile getRegisteredResourceOf(InstanceID instanceID);

ResourceProfile getFreeResource();

ResourceProfile getFreeResourceOf(InstanceID instanceID);

int getNumberPendingSlotRequests();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,34 @@ public Map<WorkerResourceSpec, Integer> getRequiredResources() {
Collections.emptyMap();
}

@Override
public ResourceProfile getRegisteredResource() {
return getResourceFromNumSlots(getNumberRegisteredSlots());
}

@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
}

@Override
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
}

@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
}

private ResourceProfile getResourceFromNumSlots(int numSlots) {
if (numSlots < 0 || defaultSlotResourceProfile == null) {
return ResourceProfile.UNKNOWN;
} else {
return defaultSlotResourceProfile.multiply(numSlots);
}
}

@VisibleForTesting
public int getNumberPendingTaskManagerSlots() {
return pendingSlots.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Contains information of a {@link ResourceProfile}.
*/
public class ResourceProfileInfo implements ResponseBody {

public static final String FIELD_NAME_CPU = "cpuCores";

public static final String FIELD_NAME_TASK_HEAP = "taskHeapMemory";

public static final String FIELD_NAME_TASK_OFFHEAP = "taskOffHeapMemory";

public static final String FIELD_NAME_MANAGED = "managedMemory";

public static final String FIELD_NAME_NETWORK = "networkMemory";

public static final String FIELD_NAME_EXTENDED = "extendedResources";

@JsonProperty(FIELD_NAME_CPU)
private final double cpu;

@JsonProperty(FIELD_NAME_TASK_HEAP)
private final int taskHeapMB;

@JsonProperty(FIELD_NAME_TASK_OFFHEAP)
private final int taskOffHeapMB;

@JsonProperty(FIELD_NAME_MANAGED)
private final int managedMB;

@JsonProperty(FIELD_NAME_NETWORK)
private final int networkMB;

@JsonProperty(FIELD_NAME_EXTENDED)
private final Map<String, Double> extendedResources;

@JsonCreator
public ResourceProfileInfo(
@JsonProperty(FIELD_NAME_CPU) double cpu,
@JsonProperty(FIELD_NAME_TASK_HEAP) int taskHeapMB,
@JsonProperty(FIELD_NAME_TASK_OFFHEAP) int taskOffHeapMB,
@JsonProperty(FIELD_NAME_MANAGED) int managedMB,
@JsonProperty(FIELD_NAME_NETWORK) int networkMB,
@JsonProperty(FIELD_NAME_EXTENDED) Map<String, Double> extendedResources) {
this.cpu = cpu;
this.taskHeapMB = taskHeapMB;
this.taskOffHeapMB = taskOffHeapMB;
this.managedMB = managedMB;
this.networkMB = networkMB;
this.extendedResources = extendedResources;
}

/**
* The ResourceProfile must not be UNKNOWN.
*/
private ResourceProfileInfo(ResourceProfile resourceProfile) {
this(resourceProfile.getCpuCores().getValue().doubleValue(),
resourceProfile.getTaskHeapMemory().getMebiBytes(),
resourceProfile.getTaskOffHeapMemory().getMebiBytes(),
resourceProfile.getManagedMemory().getMebiBytes(),
resourceProfile.getNetworkMemory().getMebiBytes(),
getExetendedResources(resourceProfile.getExtendedResources()));
}

private ResourceProfileInfo() {
this(-1.0, -1, -1, -1, -1, Collections.emptyMap());
}

public static ResourceProfileInfo fromResrouceProfile(ResourceProfile resourceProfile) {
return resourceProfile.equals(ResourceProfile.UNKNOWN) ? new ResourceProfileInfo() : new ResourceProfileInfo(resourceProfile);
}

private static Map<String, Double> getExetendedResources(Map<String, Resource> exetendedResources) {
return exetendedResources.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue().doubleValue()));
}

public double getCpu() {
return cpu;
}

public int getTaskHeapMB() {
return taskHeapMB;
}

public int getTaskOffHeapMB() {
return taskOffHeapMB;
}

public int getManagedMB() {
return managedMB;
}

public int getNetworkMB() {
return networkMB;
}

public Map<String, Double>getExtendedResources() {
return Collections.unmodifiableMap(extendedResources);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceProfileInfo that = (ResourceProfileInfo) o;
return cpu == that.cpu &&
taskHeapMB == that.taskHeapMB &&
taskOffHeapMB == that.taskOffHeapMB &&
managedMB == that.managedMB &&
networkMB == that.networkMB &&
Objects.equals(extendedResources, that.extendedResources);
}

@Override
public int hashCode() {
return Objects.hash(
cpu,
taskHeapMB,
taskOffHeapMB,
managedMB,
networkMB,
extendedResources);
}
}
Loading

0 comments on commit 44fde5d

Please sign in to comment.