Skip to content

Commit

Permalink
[FLINK-16438][yarn] Introduce WorkerSpecContainerResourceAdapter for …
Browse files Browse the repository at this point in the history
…converting between Flink WorkerResourceSpec and Yarn container Resource in YarnResourceManager.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent 8380081 commit a91057f
Show file tree
Hide file tree
Showing 4 changed files with 398 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.yarn.api.records.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}.
*/
public class WorkerSpecContainerResourceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);

private final Configuration flinkConfig;
private final int minMemMB;
private final int maxMemMB;
private final int minVcore;
private final int maxVcore;
private final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy;
private final Map<WorkerResourceSpec, Resource> workerSpecToContainerResource;
private final Map<Resource, Set<WorkerResourceSpec>> containerResourceToWorkerSpecs;
private final Map<Integer, Set<Resource>> containerMemoryToContainerResource;

WorkerSpecContainerResourceAdapter(
final Configuration flinkConfig,
final int minMemMB,
final int minVcore,
final int maxMemMB,
final int maxVcore,
final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.minMemMB = minMemMB;
this.minVcore = minVcore;
this.maxMemMB = maxMemMB;
this.maxVcore = maxVcore;
this.matchingStrategy = matchingStrategy;
workerSpecToContainerResource = new HashMap<>();
containerResourceToWorkerSpecs = new HashMap<>();
containerMemoryToContainerResource = new HashMap<>();
}

@VisibleForTesting
Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec workerResourceSpec) {
return Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
this::createAndMapContainerResource));
}

@VisibleForTesting
Set<WorkerResourceSpec> getWorkerSpecs(final Resource containerResource) {
return getEquivalentContainerResource(containerResource).stream()
.flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptySet()).stream())
.collect(Collectors.toSet());
}

@VisibleForTesting
Set<Resource> getEquivalentContainerResource(final Resource containerResource) {
// Yarn might ignore the requested vcores, depending on its configurations.
// In such cases, we should also not matching vcores.
final Set<Resource> equivalentContainerResources;
switch (matchingStrategy) {
case MATCH_VCORE:
equivalentContainerResources = Collections.singleton(containerResource);
break;
case IGNORE_VCORE:
default:
equivalentContainerResources = containerMemoryToContainerResource
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
break;
}
return equivalentContainerResources;
}

@Nullable
private Resource createAndMapContainerResource(final WorkerResourceSpec workerResourceSpec) {
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
final Resource containerResource = Resource.newInstance(
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), minMemMB),
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), minVcore));

if (resourceWithinMaxAllocation(containerResource)) {
containerResourceToWorkerSpecs.computeIfAbsent(containerResource, ignored -> new HashSet<>())
.add(workerResourceSpec);
containerMemoryToContainerResource.computeIfAbsent(containerResource.getMemory(), ignored -> new HashSet<>())
.add(containerResource);
return containerResource;
} else {
LOG.warn("Requested container resource {} exceeds yarn max allocation {}. Will not allocate resource.",
containerResource,
Resource.newInstance(maxMemMB, maxVcore));
return null;
}
}

/**
* Normalize to the minimum integer that is greater or equal to 'value' and is integer multiple of 'unitValue'.
*/
private int normalize(final int value, final int unitValue) {
return MathUtils.divideRoundUp(value, unitValue) * unitValue;
}

boolean resourceWithinMaxAllocation(final Resource resource) {
return resource.getMemory() <= maxMemMB && resource.getVirtualCores() <= maxVcore;
}

enum MatchingStrategy {
MATCH_VCORE,
IGNORE_VCORE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;

import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
Expand Down Expand Up @@ -121,6 +122,8 @@ public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>

private final Resource resource;

private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter;

public YarnResourceManager(
RpcService rpcService,
ResourceID resourceId,
Expand Down Expand Up @@ -168,6 +171,24 @@ public YarnResourceManager(

this.webInterfaceUrl = webInterfaceUrl;
this.resource = Resource.newInstance(defaultMemoryMB, defaultTaskExecutorProcessSpec.getCpuCores().getValue().intValue());

this.workerSpecContainerResourceAdapter = new WorkerSpecContainerResourceAdapter(
flinkConfig,
yarnConfig.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
yarnConfig.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
yarnConfig.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
yarnConfig.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE);
}

protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.yarn.YarnResourceManager;

import static org.apache.flink.configuration.ConfigOptions.key;

Expand All @@ -34,4 +37,27 @@ public class YarnConfigOptionsInternal {
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j.");

/**
* **DO NOT USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested.
*
* <p>By default, Yarn ignores vcores in the container requests, and always allocate 1 vcore for each container.
* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 'DominantResourceCalculator' for Yarn, will it
* allocate container vcores as requested.
*
* <P>For Hadoop 2.6+, we can learn whether Yarn matches vcores from
* {@link org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse}. However, this is not
* available to earlier Hadoop versions (pre 2.6). Therefore, for earlier Hadoop versions, the user needs to make
* sure this configuration option is consistent with the Yarn setup.
*
* <p>ATM, it should be fine to keep this option 'false', because with the current {@link SlotManagerImpl} all the
* TM containers should have the same resources. If later we add another {@link SlotManager} implementation that may
* have TMs with different resources, and if we need it to work with pre 2.6 Hadoop versions, we can expose this
* configuration option to users.
*/
public static final ConfigOption<Boolean> MATCH_CONTAINER_VCORES =
key("$internal.yarn.resourcemanager.enable-vcore-matching")
.booleanType()
.defaultValue(false)
.withDescription("**DO NOT USE** Whether YarnResourceManager should match the container vcores.");
}
Loading

0 comments on commit a91057f

Please sign in to comment.