Skip to content

Commit

Permalink
[FLINK-7928] Move Resource out of ResourceSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 14, 2017
1 parent 5643d15 commit 76e3156
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.flink.api.common.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.apache.flink.api.common.resources.GPUResource;
import org.apache.flink.api.common.resources.Resource;

import javax.annotation.Nonnull;

Expand All @@ -28,8 +29,6 @@
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Describe the different resource factors of the operator with UDF.
*
Expand Down Expand Up @@ -97,7 +96,7 @@ protected ResourceSpec(
this.stateSizeInMB = stateSizeInMB;
for (Resource resource : extendedResources) {
if (resource != null) {
this.extendedResources.put(resource.name, resource);
this.extendedResources.put(resource.getName(), resource);
}
}
}
Expand All @@ -118,7 +117,7 @@ public ResourceSpec merge(ResourceSpec other) {
this.stateSizeInMB + other.stateSizeInMB);
target.extendedResources.putAll(extendedResources);
for (Resource resource : other.extendedResources.values()) {
target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2));
target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
}
return target;
}
Expand Down Expand Up @@ -148,9 +147,14 @@ public double getGPUResource() {
if (gpuResource != null) {
return gpuResource.getValue();
}

return 0.0;
}

public Map<String, Resource> getExtendedResources() {
return extendedResources;
}

/**
* Check whether all the field values are valid.
*
Expand All @@ -160,7 +164,7 @@ public boolean isValid() {
if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
for (Resource resource : extendedResources.values()) {
if (resource.value < 0) {
if (resource.getValue() < 0) {
return false;
}
}
Expand All @@ -185,9 +189,9 @@ public boolean lessThanOrEqual(@Nonnull ResourceSpec other) {
int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
for (Resource resource : extendedResources.values()) {
if (!other.extendedResources.containsKey(resource.name) ||
!other.extendedResources.get(resource.name).type.equals(resource.type) ||
other.extendedResources.get(resource.name).value < resource.value) {
if (!other.extendedResources.containsKey(resource.getName()) ||
other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) {
return false;
}
}
Expand Down Expand Up @@ -229,7 +233,7 @@ public int hashCode() {
public String toString() {
String extend = "";
for (Resource resource : extendedResources.values()) {
extend += ", " + resource.name + "=" + resource.value;
extend += ", " + resource.getName() + "=" + resource.getValue();
}
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
Expand Down Expand Up @@ -297,121 +301,4 @@ public ResourceSpec build() {
}
}

/**
* Base class for additional resources one can specify.
*/
public abstract static class Resource implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Enum defining how resources are aggregated.
*/
public enum ResourceAggregateType {
/**
* Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
*/
AGGREGATE_TYPE_SUM,

/**
* Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
*/
AGGREGATE_TYPE_MAX
}

private final String name;

private final double value;

private final ResourceAggregateType type;

public Resource(String name, double value, ResourceAggregateType type) {
this.name = checkNotNull(name);
this.value = value;
this.type = checkNotNull(type);
}

Resource merge(Resource other) {
Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");

Double value = null;
switch (type) {
case AGGREGATE_TYPE_MAX :
value = Math.max(other.value, this.value);
break;

case AGGREGATE_TYPE_SUM:
default:
value = this.value + other.value;
}

Resource resource = create(value, type);
return resource;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && getClass() == o.getClass()) {
Resource other = (Resource) o;

return name.equals(other.name) && type.equals(other.type) && value == other.value;
} else {
return false;
}
}

@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + type.ordinal();
result = 31 * result + (int) value;
return result;
}

public String getName() {
return this.name;
}

public ResourceAggregateType getAggregateType() {
return this.type;
}

public double getValue() {
return this.value;
}

/**
* Create a resource of the same resource type.
*
* @param value The value of the resource
* @param type The aggregate type of the resource
* @return A new instance of the sub resource
*/
protected abstract Resource create(double value, ResourceAggregateType type);
}

/**
* The GPU resource.
*/
public static class GPUResource extends Resource {

private static final long serialVersionUID = -2276080061777135142L;

public GPUResource(double value) {
this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
}

public GPUResource(double value, ResourceAggregateType type) {
super(GPU_NAME, value, type);
}

@Override
public Resource create(double value, ResourceAggregateType type) {
return new GPUResource(value, type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.resources;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ResourceSpec;

/**
* The GPU resource.
*/
@Internal
public class GPUResource extends Resource {

private static final long serialVersionUID = -2276080061777135142L;

public GPUResource(double value) {
this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
}

private GPUResource(double value, ResourceAggregateType type) {
super(ResourceSpec.GPU_NAME, value, type);
}

@Override
public Resource create(double value, ResourceAggregateType type) {
return new GPUResource(value, type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.resources;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Base class for resources one can specify.
*/
@Internal
public abstract class Resource implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Enum defining how resources are aggregated.
*/
public enum ResourceAggregateType {
/**
* Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
*/
AGGREGATE_TYPE_SUM,

/**
* Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
*/
AGGREGATE_TYPE_MAX
}

private final String name;

private final double value;

private final ResourceAggregateType resourceAggregateType;

protected Resource(String name, double value, ResourceAggregateType type) {
this.name = checkNotNull(name);
this.value = value;
this.resourceAggregateType = checkNotNull(type);
}

public Resource merge(Resource other) {
Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType");
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");

final double aggregatedValue;
switch (resourceAggregateType) {
case AGGREGATE_TYPE_MAX :
aggregatedValue = Math.max(other.value, this.value);
break;

case AGGREGATE_TYPE_SUM:
default:
aggregatedValue = this.value + other.value;
}

return create(aggregatedValue, resourceAggregateType);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && getClass() == o.getClass()) {
Resource other = (Resource) o;

return name.equals(other.name) && resourceAggregateType == other.resourceAggregateType && value == other.value;
} else {
return false;
}
}

@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + resourceAggregateType.ordinal();
result = 31 * result + (int) value;
return result;
}

public String getName() {
return name;
}

public ResourceAggregateType getResourceAggregateType() {
return this.resourceAggregateType;
}

public double getValue() {
return this.value;
}

/**
* Create a resource of the same resource resourceAggregateType.
*
* @param value The value of the resource
* @param type The aggregate resourceAggregateType of the resource
* @return A new instance of the sub resource
*/
protected abstract Resource create(double value, ResourceAggregateType type);
}
Loading

0 comments on commit 76e3156

Please sign in to comment.