Skip to content

Commit

Permalink
[FLINK-7928] [runtime] extend the resources in ResourceProfile for pr…
Browse files Browse the repository at this point in the history
…ecisely calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should contains:
1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

This closes apache#4991.
  • Loading branch information
tiemsn authored and tillrohrmann committed Dec 14, 2017
1 parent fba72d0 commit 5643d15
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ResourceSpec implements Serializable {

public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);

private static final String GPU_NAME = "GPU";
public static final String GPU_NAME = "GPU";

/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
private final double cpuCores;
Expand Down Expand Up @@ -146,7 +146,7 @@ public int getStateSize() {
public double getGPUResource() {
Resource gpuResource = extendedResources.get(GPU_NAME);
if (gpuResource != null) {
return gpuResource.value;
return gpuResource.getValue();
}
return 0.0;
}
Expand Down Expand Up @@ -249,8 +249,8 @@ public static Builder newBuilder() {
*/
public static class Builder {

public double cpuCores;
public int heapMemoryInMB;
private double cpuCores;
private int heapMemoryInMB;
private int directMemoryInMB;
private int nativeMemoryInMB;
private int stateSizeInMB;
Expand Down Expand Up @@ -300,7 +300,7 @@ public ResourceSpec build() {
/**
* Base class for additional resources one can specify.
*/
protected abstract static class Resource implements Serializable {
public abstract static class Resource implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -372,6 +372,18 @@ public int hashCode() {
return result;
}

public String getName() {
return this.name;
}

public ResourceAggregateType getAggregateType() {
return this.type;
}

public double getValue() {
return this.value;
}

/**
* Create a resource of the same resource type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,30 @@

package org.apache.flink.runtime.clusterframework.types;

import org.apache.flink.api.common.operators.ResourceSpec;

import javax.annotation.Nonnull;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

/**
* Describe the resource profile of the slot, either when requiring or offering it. The profile can be
* Describe the immutable resource profile of the slot, either when requiring or offering it. The profile can be
* checked whether it can match another profile's requirement, and furthermore we may calculate a matching
* score to decide which profile we should choose when we have lots of candidate slots.
* It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster.
*
* <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
* <ol>
* <li>Memory Size</li>
* <li>CPU cores</li>
* <li>Extended resources</li>
* </ol>
* The extended resources are compared ordered by the resource names.
*/
public class ResourceProfile implements Serializable, Comparable<ResourceProfile> {

Expand All @@ -52,6 +63,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
/** How many native memory in mb are needed */
private final int nativeMemoryInMB;

/** A extensible field for user specified resources from {@link ResourceSpec}. */
private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1);

// ------------------------------------------------------------------------

/**
Expand All @@ -61,16 +75,21 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param heapMemoryInMB The size of the heap memory, in megabytes.
* @param directMemoryInMB The size of the direct memory, in megabytes.
* @param nativeMemoryInMB The size of the native memory, in megabytes.
* @param extendedResources The extendiable resources such as GPU and FPGA
*/
public ResourceProfile(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB) {
int nativeMemoryInMB,
Map<String, ResourceSpec.Resource> extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
this.nativeMemoryInMB = nativeMemoryInMB;
if (extendedResources != null) {
this.extendedResources.putAll(extendedResources);
}
}

/**
Expand All @@ -80,10 +99,7 @@ public ResourceProfile(
* @param heapMemoryInMB The size of the heap memory, in megabytes.
*/
public ResourceProfile(double cpuCores, int heapMemoryInMB) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = 0;
this.nativeMemoryInMB = 0;
this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
}

/**
Expand All @@ -92,72 +108,127 @@ public ResourceProfile(double cpuCores, int heapMemoryInMB) {
* @param other The ResourceProfile to copy.
*/
public ResourceProfile(ResourceProfile other) {
this.cpuCores = other.cpuCores;
this.heapMemoryInMB = other.heapMemoryInMB;
this.directMemoryInMB = other.directMemoryInMB;
this.nativeMemoryInMB = other.nativeMemoryInMB;
this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
}

// ------------------------------------------------------------------------

/**
* Get the cpu cores needed
* Get the cpu cores needed.
*
* @return The cpu cores, 1.0 means a full cpu thread
*/
public double getCpuCores() {
return cpuCores;
}

/**
* Get the heap memory needed in MB
* Get the heap memory needed in MB.
*
* @return The heap memory in MB
*/
public long getHeapMemoryInMB() {
public int getHeapMemoryInMB() {
return heapMemoryInMB;
}

/**
* Get the direct memory needed in MB
* Get the direct memory needed in MB.
*
* @return The direct memory in MB
*/
public int getDirectMemoryInMB() {
return directMemoryInMB;
}

/**
* Get the native memory needed in MB
* Get the native memory needed in MB.
*
* @return The native memory in MB
*/
public int getNativeMemoryInMB() {
return nativeMemoryInMB;
}

/**
* Get the total memory needed in MB
* Get the total memory needed in MB.
*
* @return The total memory in MB
*/
public int getMemoryInMB() {
return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
}

/**
* Check whether required resource profile can be matched
* Get the memory the operators needed in MB.
*
* @return The operator memory in MB
*/
public int getOperatorsMemoryInMB() {
return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
}

/**
* Get the extended resources.
*
* @return The extended resources
*/
public Map<String, ResourceSpec.Resource> getExtendedResources() {
return Collections.unmodifiableMap(extendedResources);
}

/**
* Check whether required resource profile can be matched.
*
* @param required the required resource profile
* @return true if the requirement is matched, otherwise false
*/
public boolean isMatching(ResourceProfile required) {
return cpuCores >= required.getCpuCores() &&
if (cpuCores >= required.getCpuCores() &&
heapMemoryInMB >= required.getHeapMemoryInMB() &&
directMemoryInMB >= required.getDirectMemoryInMB() &&
nativeMemoryInMB >= required.getNativeMemoryInMB();
nativeMemoryInMB >= required.getNativeMemoryInMB()) {
for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey()) ||
!extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) ||
extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) {
return false;
}
}
return true;
}
return false;
}

@Override
public int compareTo(@Nonnull ResourceProfile other) {
int cmp1 = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
return (cmp1 != 0) ? cmp1 : cmp2;
int cmp = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
if (cmp == 0) {
cmp = Double.compare(this.cpuCores, other.cpuCores);
}
if (cmp == 0) {
Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator();
Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator();
while (thisIterator.hasNext() && otherIterator.hasNext()) {
Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next();
Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next();
if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
return cmp;
}
if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) {
return 1;
}
if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) {
return cmp;
}
}
if (thisIterator.hasNext()) {
return 1;
}
if (otherIterator.hasNext()) {
return -1;
}
}
return cmp;
}

// ------------------------------------------------------------------------
Expand All @@ -169,6 +240,7 @@ public int hashCode() {
result = 31 * result + heapMemoryInMB;
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + extendedResources.hashCode();
return result;
}

Expand All @@ -181,20 +253,40 @@ else if (obj != null && obj.getClass() == ResourceProfile.class) {
ResourceProfile that = (ResourceProfile) obj;
return this.cpuCores == that.cpuCores &&
this.heapMemoryInMB == that.heapMemoryInMB &&
this.directMemoryInMB == that.directMemoryInMB;
}
else {
return false;
this.directMemoryInMB == that.directMemoryInMB &&
Objects.equals(extendedResources, that.extendedResources);
}
return false;
}

@Override
public String toString() {
String resourceStr = "";
for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) {
resourceStr += ", " + resource.getKey() + "=" + resource.getValue();
}
return "ResourceProfile{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
'}';
}

public static ResourceProfile fromResourceSpec(
ResourceSpec resourceSpec) {
Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1);
double gpu = resourceSpec.getGPUResource();
if (gpu > 0) {
extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu));
}
ResourceProfile resourceProfile = new ResourceProfile(
resourceSpec.getCpuCores(),
resourceSpec.getHeapMemory(),
resourceSpec.getDirectMemory(),
resourceSpec.getNativeMemory(),
extendResource
);
return resourceProfile;
}
}
Loading

0 comments on commit 5643d15

Please sign in to comment.