Skip to content

Commit

Permalink
[FLINK-17407] Forward extended resource request to YARN.
Browse files Browse the repository at this point in the history
  • Loading branch information
KarmaGYZ authored and tillrohrmann committed May 17, 2020
1 parent d4d3747 commit 2cff3d2
Show file tree
Hide file tree
Showing 5 changed files with 411 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.hadoop.yarn.api.records.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Looks up the methods related to org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
* Only supported in Hadoop 3.0+ or 2.10+.
*/
class ResourceInformationReflector {

private static final Logger LOG = LoggerFactory.getLogger(ResourceInformationReflector.class);

static final ResourceInformationReflector INSTANCE = new ResourceInformationReflector();

/** Class used to set the extended resource. */
private static final String RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation";

/** Could be Null iff isYarnResourceTypesAvailable is false. */
@Nullable
private final Method resourceSetResourceInformationMethod;

/** Could be Null iff isYarnResourceTypesAvailable is false. */
@Nullable
private final Method resourceGetResourcesMethod;

/** Could be Null iff isYarnResourceTypesAvailable is false. */
@Nullable
private final Method resourceInformationGetNameMethod;

/** Could be Null iff isYarnResourceTypesAvailable is false. */
@Nullable
private final Method resourceInformationGetValueMethod;

/** Could be Null iff isYarnResourceTypesAvailable is false. */
@Nullable
private final Method resourceInformationNewInstanceMethod;

private final boolean isYarnResourceTypesAvailable;

private ResourceInformationReflector() {
this(Resource.class.getName(), RESOURCE_INFO_CLASS);
}

@VisibleForTesting
ResourceInformationReflector(String resourceClassName, String resourceInfoClassName) {
Method resourceSetResourceInformationMethod = null;
Method resourceGetResourcesMethod = null;
Method resourceInformationGetNameMethod = null;
Method resourceInformationGetValueMethod = null;
Method resourceInformationNewInstanceMethod = null;
boolean isYarnResourceTypesAvailable = false;
try {
final Class<?> resourceClass = Class.forName(resourceClassName);
final Class<?> resourceInfoClass = Class.forName(resourceInfoClassName);
resourceSetResourceInformationMethod = resourceClass.getMethod("setResourceInformation", String.class, resourceInfoClass);
resourceGetResourcesMethod = resourceClass.getMethod("getResources");
resourceInformationGetNameMethod = resourceInfoClass.getMethod("getName");
resourceInformationGetValueMethod = resourceInfoClass.getMethod("getValue");
resourceInformationNewInstanceMethod = resourceInfoClass.getMethod("newInstance", String.class, long.class);
isYarnResourceTypesAvailable = true;
} catch (Exception e) {
LOG.debug("The underlying Yarn version does not support external resources.", e);
} finally {
this.resourceSetResourceInformationMethod = resourceSetResourceInformationMethod;
this.resourceGetResourcesMethod = resourceGetResourcesMethod;
this.resourceInformationGetNameMethod = resourceInformationGetNameMethod;
this.resourceInformationGetValueMethod = resourceInformationGetValueMethod;
this.resourceInformationNewInstanceMethod = resourceInformationNewInstanceMethod;
this.isYarnResourceTypesAvailable = isYarnResourceTypesAvailable;
}
}

/**
* Add the given resourceName and value to the {@link Resource}.
*/
void setResourceInformation(Resource resource, String resourceName, long amount) {
setResourceInformationUnSafe(resource, resourceName, amount);
}

/**
* Same as {@link #setResourceInformation(Resource, String, long)} but
* allows to pass objects that are not of type {@link Resource}.
*/
@VisibleForTesting
void setResourceInformationUnSafe(Object resource, String resourceName, long amount) {
if (!isYarnResourceTypesAvailable) {
LOG.info("Will not request extended resource {} because the used YARN version does not support it.", resourceName);
return;
}
try {
resourceSetResourceInformationMethod.invoke(
resource,
resourceName,
resourceInformationNewInstanceMethod.invoke(null, resourceName, amount));
} catch (Exception e) {
LOG.warn("Error in setting the external resource {}. Will not request this resource from YARN.", resourceName, e);
}
}

/**
* Get the name and value of external resources from the {@link Resource}.
*/
Map<String, Long> getExternalResources(Resource resource) {
return getExternalResourcesUnSafe(resource);
}

/**
* Same as {@link #getExternalResources(Resource)} but
* allows to pass objects that are not of type {@link Resource}.
*/
@VisibleForTesting
Map<String, Long> getExternalResourcesUnSafe(Object resource) {
if (!isYarnResourceTypesAvailable) {
return Collections.emptyMap();
}

final Map<String, Long> externalResources = new HashMap<>();
final Object[] externalResourcesInfo;
try {
externalResourcesInfo = (Object[]) resourceGetResourcesMethod.invoke(resource);
// The first two element would be cpu and mem.
for (int i = 2; i < externalResourcesInfo.length; i++) {
final String name = (String) resourceInformationGetNameMethod.invoke(externalResourcesInfo[i]);
final long value = (long) resourceInformationGetValueMethod.invoke(externalResourcesInfo[i]);
externalResources.put(name, value);
}
} catch (Exception e) {
LOG.warn("Could not obtain the external resources supported by the given Resource.", e);
return Collections.emptyMap();
}
return externalResources;
}

/**
* Get the name and value of all resources from the {@link Resource}.
*/
@VisibleForTesting
Map<String, Long> getAllResourceInfos(Object resource) {
if (!isYarnResourceTypesAvailable) {
return Collections.emptyMap();
}

final Map<String, Long> externalResources = new HashMap<>();
final Object[] externalResourcesInfo;
try {
externalResourcesInfo = (Object[]) resourceGetResourcesMethod.invoke(resource);
for (int i = 0; i < externalResourcesInfo.length; i++) {
final String name = (String) resourceInformationGetNameMethod.invoke(externalResourcesInfo[i]);
final long value = (long) resourceInformationGetValueMethod.invoke(externalResourcesInfo[i]);
externalResources.put(name, value);
}
} catch (Exception e) {
LOG.warn("Could not obtain the external resources supported by the given Resource.", e);
return Collections.emptyMap();
}
return externalResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
Expand All @@ -50,6 +51,7 @@ class WorkerSpecContainerResourceAdapter {
private final int maxMemMB;
private final int minVcore;
private final int maxVcore;
private final Map<String, Long> externalResourceConfigs;
private final Map<WorkerResourceSpec, InternalContainerResource> workerSpecToContainerResource;
private final Map<InternalContainerResource, Set<WorkerResourceSpec>> containerResourceToWorkerSpecs;
private final Map<Integer, Set<InternalContainerResource>> containerMemoryToContainerResource;
Expand All @@ -59,12 +61,14 @@ class WorkerSpecContainerResourceAdapter {
final int minMemMB,
final int minVcore,
final int maxMemMB,
final int maxVcore) {
final int maxVcore,
final Map<String, Long> externalResourceConfigs) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.minMemMB = minMemMB;
this.minVcore = minVcore;
this.maxMemMB = maxMemMB;
this.maxVcore = maxVcore;
this.externalResourceConfigs = Preconditions.checkNotNull(externalResourceConfigs);
workerSpecToContainerResource = new HashMap<>();
containerResourceToWorkerSpecs = new HashMap<>();
containerMemoryToContainerResource = new HashMap<>();
Expand Down Expand Up @@ -118,7 +122,8 @@ private InternalContainerResource createAndMapContainerResource(final WorkerReso
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
final InternalContainerResource internalContainerResource = new InternalContainerResource(
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB),
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore));
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore),
externalResourceConfigs);

if (resourceWithinMaxAllocation(internalContainerResource)) {
containerResourceToWorkerSpecs.computeIfAbsent(internalContainerResource, ignored -> new HashSet<>())
Expand All @@ -129,7 +134,7 @@ private InternalContainerResource createAndMapContainerResource(final WorkerReso
} else {
LOG.warn("Requested container resource {} exceeds yarn max allocation {}. Will not allocate resource.",
internalContainerResource,
new InternalContainerResource(maxMemMB, maxVcore));
new InternalContainerResource(maxMemMB, maxVcore, Collections.emptyMap()));
return null;
}
}
Expand All @@ -145,6 +150,12 @@ private boolean resourceWithinMaxAllocation(final InternalContainerResource reso
return resource.memory <= maxMemMB && resource.vcores <= maxVcore;
}

private static void trySetExternalResources(Map<String, Long> externalResources, Resource resource) {
for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) {
ResourceInformationReflector.INSTANCE.setResourceInformation(resource, externalResource.getKey(), externalResource.getValue());
}
}

enum MatchingStrategy {
MATCH_VCORE,
IGNORE_VCORE
Expand All @@ -158,20 +169,25 @@ enum MatchingStrategy {
private static final class InternalContainerResource {
private final int memory;
private final int vcores;
private final Map<String, Long> externalResources;

private InternalContainerResource(final int memory, final int vcores) {
private InternalContainerResource(final int memory, final int vcores, final Map<String, Long> externalResources) {
this.memory = memory;
this.vcores = vcores;
this.externalResources = externalResources;
}

private InternalContainerResource(final Resource resource) {
this(
Preconditions.checkNotNull(resource).getMemory(),
Preconditions.checkNotNull(resource).getVirtualCores());
Preconditions.checkNotNull(resource).getVirtualCores(),
ResourceInformationReflector.INSTANCE.getExternalResources(resource));
}

private Resource toResource() {
return Resource.newInstance(memory, vcores);
final Resource resource = Resource.newInstance(memory, vcores);
trySetExternalResources(externalResources, resource);
return resource;
}

@Override
Expand All @@ -180,21 +196,38 @@ public boolean equals(Object obj) {
return true;
} else if (obj instanceof InternalContainerResource) {
final InternalContainerResource other = (InternalContainerResource) obj;
return this.memory == other.memory && this.vcores == other.vcores;
return this.memory == other.memory && this.vcores == other.vcores && this.externalResources.equals(other.externalResources);
}
return false;
}

@Override
public int hashCode() {
final int prime = 31;
int result = Integer.hashCode(memory);
result = 31 * result + Integer.hashCode(vcores);
result = prime * result + Integer.hashCode(vcores);
result = prime * result + externalResources.hashCode();
return result;
}

@Override
public String toString() {
return "<memory:" + memory + ", vCores:" + vcores + ">";
final StringBuilder sb = new StringBuilder();

sb.append("<memory:")
.append(memory)
.append(", vCores:")
.append(vcores);

final Set<String> externalResourceNames = new TreeSet<>(externalResources.keySet());
for (String externalResourceName : externalResourceNames) {
sb.append(", ")
.append(externalResourceName).append(": ")
.append(externalResources.get(externalResourceName));
}

sb.append(">");
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
Expand Down Expand Up @@ -183,7 +184,8 @@ public YarnResourceManager(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
yarnConfig.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES));
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(log);

this.matchingStrategy = flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
Expand Down
Loading

0 comments on commit 2cff3d2

Please sign in to comment.