Skip to content

Commit

Permalink
[FLINK-8431] Allow to specify # GPUs for TaskManager in Mesos
Browse files Browse the repository at this point in the history
[FLINK-8431] Upgrade Fenzo dependency to 0.10.1

[FLINK-8431] Simplify scalar aggregation

[FLINK-8431] Offer::getScalarValues need not contain entries for cpus, mem, disk, and network

[FLINK-8431] Floor # gpus to make sure whole numbers

This closes apache#5307.
  • Loading branch information
eastcirclek authored and tillrohrmann committed Jan 30, 2018
1 parent e2c2cf4 commit b4e90fe
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 28 deletions.
2 changes: 2 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ May be set to -1 to disable this feature.

- `mesos.resourcemanager.tasks.cpus`: CPUs to assign to the Mesos workers (**DEFAULT**: 0.0)

- `mesos.resourcemanager.tasks.gpus`: GPUs to assign to the Mesos workers (**DEFAULT**: 0.0)

- `mesos.resourcemanager.tasks.container.type`: Type of the containerization used: "mesos" or "docker" (DEFAULT: mesos);

- `mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
Expand Down
2 changes: 2 additions & 0 deletions docs/ops/deployment/mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ May be set to -1 to disable this feature.

`mesos.resourcemanager.tasks.cpus`: CPUs to assign to the Mesos workers (**DEFAULT**: 0.0)

`mesos.resourcemanager.tasks.gpus`: GPUs to assign to the Mesos workers (**DEFAULT**: 0.0)

`mesos.resourcemanager.tasks.container.type`: Type of the containerization used: "mesos" or "docker" (DEFAULT: mesos);

`mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
Expand Down
2 changes: 1 addition & 1 deletion flink-mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ under the License.
<dependency>
<groupId>com.netflix.fenzo</groupId>
<artifactId>fenzo-core</artifactId>
<version>0.9.3</version>
<version>0.10.1</version>
<exclusions>
<!-- exclude mesos here to override -->
<exclusion>
Expand Down
14 changes: 14 additions & 0 deletions flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ public static Protos.Resource cpus(String role, double amount) {
return scalar("cpus", role, amount);
}

/**
* Construct a gpu resource.
*/
public static Protos.Resource gpus(double amount) {
return gpus(UNRESERVED_ROLE, amount);
}

/**
* Construct a gpu resource.
*/
public static Protos.Resource gpus(String role, double amount) {
return scalar("gpus", role, amount);
}

/**
* Construct a mem resource.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ public static MesosTaskManagerParameters createTmParameters(Configuration config
log.info("TaskManagers will be created with {} task slots",
taskManagerParameters.containeredParameters().numSlots());
log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
"JVM direct memory limit {} MB, {} cpus",
"JVM direct memory limit {} MB, {} cpus, {} gpus",
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
taskManagerParameters.cpus());
taskManagerParameters.cpus(),
taskManagerParameters.gpus());

return taskManagerParameters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public double getCPUs() {
return params.cpus();
}

public double getGPUs() {
return params.gpus();
}

@Override
public double getMemory() {
return params.containeredParameters().taskManagerTotalMemoryMB();
Expand All @@ -145,6 +149,11 @@ public int getPorts() {
return TM_PORT_KEYS.length;
}

@Override
public Map<String, Double> getScalarRequests() {
return Collections.singletonMap("gpus", (double) params.gpus());
}

@Override
public Map<String, NamedResourceSetRequest> getCustomNamedResources() {
return Collections.emptyMap();
Expand Down Expand Up @@ -174,8 +183,9 @@ public AssignedResources getAssignedResources() {
public String toString() {
return "Request{" +
"cpus=" + getCPUs() +
"memory=" + getMemory() +
'}';
", memory=" + getMemory() +
", gpus=" + getGPUs() +
"}";
}
}

Expand Down Expand Up @@ -204,6 +214,7 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation al
// take needed resources from the overall allocation, under the assumption of adequate resources
Set<String> roles = mesosConfiguration.roles();
taskInfo.addAllResources(allocation.takeScalar("cpus", taskRequest.getCPUs(), roles));
taskInfo.addAllResources(allocation.takeScalar("gpus", taskRequest.getGPUs(), roles));
taskInfo.addAllResources(allocation.takeScalar("mem", taskRequest.getMemory(), roles));

final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.netflix.fenzo.functions.Action1;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.FrameworkInfo;
import org.apache.mesos.Protos.FrameworkInfo.Capability;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;

Expand Down Expand Up @@ -168,6 +169,11 @@ protected void initialize() throws Exception {
frameworkInfo.setId(frameworkID.get());
}

if (taskManagerParameters.gpus() > 0) {
LOG.info("Add GPU_RESOURCES capability to framework");
frameworkInfo.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES));
}

MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
Expand Down Expand Up @@ -361,8 +367,9 @@ protected void requestNewWorkers(int numWorkers) {

LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());

LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus).",
launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
launchable.taskRequest().getScalarRequests().get("gpus"));

toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
toLaunch.add(launchable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID,
// create the specific TM parameters from the resource profile and some defaults
MesosTaskManagerParameters params = new MesosTaskManagerParameters(
resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
taskManagerParameters.gpus(),
taskManagerParameters.containerType(),
taskManagerParameters.containerImageName(),
new ContaineredTaskManagerParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.cpus")
.defaultValue(0.0);

public static final ConfigOption<Integer> MESOS_RM_TASKS_GPUS =
key("mesos.resourcemanager.tasks.gpus")
.defaultValue(0);

public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
key("mesos.resourcemanager.tasks.container.type")
.defaultValue("mesos");
Expand Down Expand Up @@ -105,6 +109,8 @@ public class MesosTaskManagerParameters {

private final double cpus;

private final int gpus;

private final ContainerType containerType;

private final Option<String> containerImageName;
Expand All @@ -125,6 +131,7 @@ public class MesosTaskManagerParameters {

public MesosTaskManagerParameters(
double cpus,
int gpus,
ContainerType containerType,
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
Expand All @@ -136,6 +143,7 @@ public MesosTaskManagerParameters(
Option<String> taskManagerHostname) {

this.cpus = cpus;
this.gpus = gpus;
this.containerType = Preconditions.checkNotNull(containerType);
this.containerImageName = Preconditions.checkNotNull(containerImageName);
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
Expand All @@ -154,6 +162,13 @@ public double cpus() {
return cpus;
}

/**
* Get the GPU units to use for the TaskManager Process.
*/
public int gpus() {
return gpus;
}

/**
* Get the container type (Mesos or Docker). The default is Mesos.
*
Expand Down Expand Up @@ -223,6 +238,7 @@ public Option<String> bootstrapCommand() {
public String toString() {
return "MesosTaskManagerParameters{" +
"cpus=" + cpus +
", gpus=" + gpus +
", containerType=" + containerType +
", containerImageName=" + containerImageName +
", containeredParameters=" + containeredParameters +
Expand Down Expand Up @@ -254,6 +270,13 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {
cpus = Math.max(containeredParameters.numSlots(), 1.0);
}

int gpus = flinkConfig.getInteger(MESOS_RM_TASKS_GPUS);

if (gpus < 0) {
throw new IllegalConfigurationException(MESOS_RM_TASKS_GPUS.key() +
" cannot be negative");
}

// parse the containerization parameters
String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);

Expand Down Expand Up @@ -291,6 +314,7 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {

return new MesosTaskManagerParameters(
cpus,
gpus,
containerType,
Option.apply(imageName),
containeredParameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ public class Offer implements VirtualMachineLease {
private final long offeredTime;

private final List<Protos.Resource> resources;
private final Map<String, Double> aggregatedScalarResourceMap;
private final Map<String, Protos.Attribute> attributeMap;

private final double cpuCores;
private final double memoryMB;
private final double networkMbps;
private final double diskMB;

private final List<Range> portRanges;

public Offer(Protos.Offer offer) {
Expand All @@ -66,13 +68,26 @@ public Offer(Protos.Offer offer) {
this.offeredTime = System.currentTimeMillis();

List<Protos.Resource> resources = new ArrayList<>(offer.getResourcesList().size());
Map<String, List<Protos.Resource>> resourceMap = new HashMap<>();
Map<String, Double> aggregatedScalarResourceMap = new HashMap<String, Double>() {
@Override
public Double remove(Object key) {
if (super.containsKey(key)) {
return super.remove(key);
} else {
return 0.0;
}
}
};
Map<String, List<Protos.Resource>> rangesResourceMap = new HashMap<>();
for (Protos.Resource resource : offer.getResourcesList()) {
switch (resource.getType()) {
case SCALAR:
resources.add(resource);
aggregatedScalarResourceMap.merge(resource.getName(), resource.getScalar().getValue(), Double::sum);
break;
case RANGES:
resources.add(resource);
resourceMap.computeIfAbsent(resource.getName(), k -> new ArrayList<>(2)).add(resource);
rangesResourceMap.computeIfAbsent(resource.getName(), k -> new ArrayList<>(2)).add(resource);
break;
default:
logger.debug("Unknown resource type " + resource.getType() + " for resource " + resource.getName() +
Expand All @@ -81,11 +96,12 @@ public Offer(Protos.Offer offer) {
}
this.resources = Collections.unmodifiableList(resources);

this.cpuCores = aggregateScalarResource(resourceMap, "cpus");
this.memoryMB = aggregateScalarResource(resourceMap, "mem");
this.networkMbps = aggregateScalarResource(resourceMap, "network");
this.diskMB = aggregateScalarResource(resourceMap, "disk");
this.portRanges = Collections.unmodifiableList(aggregateRangesResource(resourceMap, "ports"));
this.cpuCores = aggregatedScalarResourceMap.remove("cpus");
this.memoryMB = aggregatedScalarResourceMap.remove("mem");
this.networkMbps = aggregatedScalarResourceMap.remove("network");
this.diskMB = aggregatedScalarResourceMap.remove("disk");
this.aggregatedScalarResourceMap = Collections.unmodifiableMap(aggregatedScalarResourceMap);
this.portRanges = Collections.unmodifiableList(aggregateRangesResource(rangesResourceMap, "ports"));

if (offer.getAttributesCount() > 0) {
Map<String, Protos.Attribute> attributeMap = new HashMap<>();
Expand Down Expand Up @@ -117,6 +133,10 @@ public double cpuCores() {
return cpuCores;
}

public double gpus() {
return getScalarValue("gpus");
}

@Override
public double memoryMB() {
return memoryMB;
Expand Down Expand Up @@ -156,6 +176,16 @@ public Map<String, Protos.Attribute> getAttributeMap() {
return attributeMap;
}

@Override
public Double getScalarValue(String name) {
return aggregatedScalarResourceMap.getOrDefault(name, 0.0);
}

@Override
public Map<String, Double> getScalarValues() {
return aggregatedScalarResourceMap;
}

@Override
public String toString() {
return "Offer{" +
Expand All @@ -168,13 +198,6 @@ public String toString() {
'}';
}

private static double aggregateScalarResource(Map<String, List<Protos.Resource>> resourceMap, String resourceName) {
if (resourceMap.get(resourceName) == null) {
return 0.0;
}
return resourceMap.get(resourceName).stream().mapToDouble(r -> r.getScalar().getValue()).sum();
}

private static List<Range> aggregateRangesResource(Map<String, List<Protos.Resource>> resourceMap, String resourceName) {
if (resourceMap.get(resourceName) == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public static void logMesosConfig(Logger log, MesosConfiguration config) {
log.info(" Name: {}", info.hasName() ? info.getName() : "(none)");
log.info(" Failover Timeout (secs): {}", info.getFailoverTimeout());
log.info(" Role: {}", info.hasRole() ? info.getRole() : "(none)");
log.info(" Capabilities: {}",
info.getCapabilitiesList().size() > 0 ? info.getCapabilitiesList() : "(none)");
log.info(" Principal: {}", info.hasPrincipal() ? info.getPrincipal() : "(none)");
log.info(" Host: {}", info.hasHostname() ? info.getHostname() : "(none)");
if (env.containsKey("LIBPROCESS_IP")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,15 @@ class LaunchCoordinator(
case Event(offers: ResourceOffers, data: GatherData) =>
val leases = offers.offers().asScala.map(new Offer(_))
if(LOG.isInfoEnabled) {
val (cpus, mem) = leases.foldLeft((0.0,0.0)) {
(z,o) => (z._1 + o.cpuCores(), z._2 + o.memoryMB())
val (cpus, gpus, mem) = leases.foldLeft((0.0,0.0,0.0)) {
(z,o) => (z._1 + o.cpuCores(), z._2 + o.gpus(), z._3 + o.memoryMB())
}
LOG.info(s"Received offer(s) of $mem MB, $cpus cpus:")
LOG.info(s"Received offer(s) of $mem MB, $cpus cpus, $gpus gpus:")
for(l <- leases) {
val reservations = l.getResources.asScala.map(_.getRole).toSet
LOG.info(
s" ${l.getId} from ${l.hostname()} of ${l.memoryMB()} MB, ${l.cpuCores()} cpus" +
s" ${l.getId} from ${l.hostname()} of ${l.memoryMB()} MB," +
s" ${l.cpuCores()} cpus, ${l.gpus()} gpus" +
s" for ${reservations.mkString("[", ",", "]")}")
}
}
Expand All @@ -178,7 +179,8 @@ class LaunchCoordinator(
LOG.info("Resources considered: (note: expired offers not deducted from below)")
for(vm <- optimizer.getVmCurrentStates.asScala) {
val lease = vm.getCurrAvailableResources
LOG.info(s" ${vm.getHostname} has ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus")
LOG.info(s" ${vm.getHostname} has ${lease.memoryMB()} MB," +
s" ${lease.cpuCores()} cpus, ${lease.getScalarValue("gpus")} gpus")
}
}
log.debug(result.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void initialize() {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0,
1.0, 1,
MesosTaskManagerParameters.ContainerType.MESOS,
Option.<String>empty(),
containeredParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ static class Context implements AutoCloseable {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
1.0, 1, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(),
Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
Option.<String>empty());
Expand Down
Loading

0 comments on commit b4e90fe

Please sign in to comment.