Skip to content

Commit

Permalink
[FLINK-20860][core] Update valid names for TaskManagerOptions#MANAGED…
Browse files Browse the repository at this point in the history
…_MEMORY_CONSUMER_WEIGHTS.

This closes apache#14576
  • Loading branch information
xintongsong committed Jan 8, 2021
1 parent 004e374 commit ed354d9
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 76 deletions.
4 changes: 2 additions & 2 deletions docs/_includes/generated/common_memory_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@
</tr>
<tr>
<td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
<td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
<td style="word-wrap: break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
<td>Map</td>
<td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are DATAPROC (for RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON (for Python processes).</td>
<td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are OPERATOR (for built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for Python processes).</td>
</tr>
<tr>
<td><h5>taskmanager.memory.managed.fraction</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
</tr>
<tr>
<td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
<td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
<td style="word-wrap: break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
<td>Map</td>
<td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are DATAPROC (for RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON (for Python processes).</td>
<td>Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are OPERATOR (for built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for Python processes).</td>
</tr>
<tr>
<td><h5>taskmanager.memory.managed.fraction</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@
@ConfigGroups(groups = @ConfigGroup(name = "TaskManagerMemory", keyPrefix = "taskmanager.memory"))
public class TaskManagerOptions {

public static final String MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = "DATAPROC";
/**
* @deprecated use {@link #MANAGED_MEMORY_CONSUMER_NAME_OPERATOR} and {@link
* #MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND} instead
*/
@Deprecated public static final String MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = "DATAPROC";

public static final String MANAGED_MEMORY_CONSUMER_NAME_OPERATOR = "OPERATOR";
public static final String MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND = "STATE_BACKEND";
public static final String MANAGED_MEMORY_CONSUMER_NAME_PYTHON = "PYTHON";

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -433,17 +440,21 @@ public class TaskManagerOptions {
.defaultValue(
new HashMap<String, String>() {
{
put(MANAGED_MEMORY_CONSUMER_NAME_DATAPROC, "70");
put(MANAGED_MEMORY_CONSUMER_NAME_OPERATOR, "70");
put(MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND, "70");
put(MANAGED_MEMORY_CONSUMER_NAME_PYTHON, "30");
}
})
.withDescription(
"Managed memory weights for different kinds of consumers. A slot’s managed memory is"
+ " shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless"
+ " of the number of consumers from each kind. Currently supported kinds of consumers are "
+ MANAGED_MEMORY_CONSUMER_NAME_DATAPROC
+ " (for RocksDB state backend in streaming and built-in"
+ " algorithms in batch) and "
"Managed memory weights for different kinds of consumers. A slot’s"
+ " managed memory is shared by all kinds of consumers it"
+ " contains, proportionally to the kinds’ weights and"
+ " regardless of the number of consumers from each kind."
+ " Currently supported kinds of consumers are "
+ MANAGED_MEMORY_CONSUMER_NAME_OPERATOR
+ " (for built-in algorithms), "
+ MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND
+ " (for RocksDB state backend) and "
+ MANAGED_MEMORY_CONSUMER_NAME_PYTHON
+ " (for Python processes).");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@
package org.apache.flink.runtime.util.config.memory;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.state.StateBackendLoader;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Utils for configuration and calculations related to managed memory and its various use cases. */
public enum ManagedMemoryUtils {
Expand All @@ -44,11 +47,20 @@ public enum ManagedMemoryUtils {

private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;

/** Valid names of managed memory consumers. */
private static final String[] MANAGED_MEMORY_CONSUMER_VALID_NAMES = {
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON
};
/** Names of managed memory use cases, in the fallback order. */
@SuppressWarnings("deprecation")
private static final Map<ManagedMemoryUseCase, List<String>> USE_CASE_CONSUMER_NAMES =
ImmutableMap.of(
ManagedMemoryUseCase.OPERATOR,
ImmutableList.of(
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
ManagedMemoryUseCase.STATE_BACKEND,
ImmutableList.of(
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND,
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
ManagedMemoryUseCase.PYTHON,
ImmutableList.of(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON));

public static double convertToFractionOfSlot(
ManagedMemoryUseCase useCase,
Expand Down Expand Up @@ -86,52 +98,53 @@ public static double convertToFractionOfSlot(
@VisibleForTesting
static Map<ManagedMemoryUseCase, Integer> getManagedMemoryUseCaseWeightsFromConfig(
Configuration config) {
final Map<String, String> consumerWeights =
final Map<String, String> configuredWeights =
config.get(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS);
final Map<ManagedMemoryUseCase, Integer> effectiveWeights = new HashMap<>();

for (Map.Entry<ManagedMemoryUseCase, List<String>> entry :
USE_CASE_CONSUMER_NAMES.entrySet()) {
final ManagedMemoryUseCase useCase = entry.getKey();
final Iterator<String> nameIter = entry.getValue().iterator();

boolean findWeight = false;
while (!findWeight && nameIter.hasNext()) {
final String name = nameIter.next();
final String weightStr = configuredWeights.get(name);
if (weightStr != null) {
final int weight = Integer.parseInt(weightStr);
findWeight = true;

if (weight < 0) {
throw new IllegalConfigurationException(
String.format(
"Managed memory weight should not be negative. Configured "
+ "weight for %s is %d.",
useCase, weight));
}

if (weight == 0) {
LOG.debug(
"Managed memory consumer weight for {} is configured to 0. Jobs "
+ "containing this type of managed memory consumers may "
+ "fail due to not being able to allocate managed memory.",
useCase);
}

effectiveWeights.put(useCase, weight);
}
}

for (String consumer : MANAGED_MEMORY_CONSUMER_VALID_NAMES) {
if (!consumerWeights.containsKey(consumer)) {
if (!findWeight) {
LOG.debug(
"Managed memory consumer weight for {} is not configured. Jobs containing this type of "
+ "managed memory consumers may fail due to not being able to allocate managed memory.",
consumer);
"Managed memory consumer weight for {} is not configured. Jobs containing "
+ "this type of managed memory consumers may fail due to not being "
+ "able to allocate managed memory.",
useCase);
}
}

return consumerWeights.entrySet().stream()
.flatMap(
(entry) -> {
final String consumer = entry.getKey();
final int weight = Integer.parseInt(entry.getValue());

if (weight < 0) {
throw new IllegalConfigurationException(
String.format(
"Managed memory weight should not be negative. Configured weight for %s is %d.",
consumer, weight));
}

if (weight == 0) {
LOG.debug(
"Managed memory consumer weight for {} is configured to 0. Jobs containing this type of "
+ "managed memory consumers may fail due to not being able to allocate managed memory.",
consumer);
}

switch (consumer) {
case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC:
return Stream.of(
Tuple2.of(ManagedMemoryUseCase.OPERATOR, weight),
Tuple2.of(ManagedMemoryUseCase.STATE_BACKEND, weight));
case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON:
return Stream.of(
Tuple2.of(ManagedMemoryUseCase.PYTHON, weight));
default:
throw new IllegalConfigurationException(
"Unknown managed memory consumer: " + consumer);
}
})
.collect(Collectors.toMap((tuple2) -> tuple2.f0, (tuple2) -> tuple2.f1));
return effectiveWeights;
}

public static double getFractionRoundedDown(final long dividend, final long divisor) {
Expand Down
Loading

0 comments on commit ed354d9

Please sign in to comment.