Skip to content

Commit

Permalink
[FLINK-23707][streaming-java] Use consistent managed memory weights f…
Browse files Browse the repository at this point in the history
…or StreamNode

This closes apache#16771.
  • Loading branch information
twalthr committed Aug 12, 2021
1 parent b4c524b commit 3e62364
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ public ResourceSpec getPreferredResources() {
* @param managedMemoryUseCase The use case that this transformation declares needing managed
* memory for.
* @param weight Use-case-specific weights for this transformation. Used for sharing managed
* memory across transformations for OPERATOR scope use cases.
* memory across transformations for OPERATOR scope use cases. Check the individual {@link
* ManagedMemoryUseCase} for the specific weight definition.
* @return The previous weight, if exist.
*/
public Optional<Integer> declareManagedMemoryUseCaseAtOperatorScope(
Expand Down Expand Up @@ -313,7 +314,8 @@ protected void updateManagedMemoryStateBackendUseCase(boolean hasStateBackend) {
/**
* Get operator scope use cases that this transformation needs managed memory for, and the
* use-case-specific weights for this transformation. The weights are used for sharing managed
* memory across transformations for the use cases.
* memory across transformations for the use cases. Check the individual {@link
* ManagedMemoryUseCase} for the specific weight definition.
*/
public Map<ManagedMemoryUseCase, Integer> getManagedMemoryOperatorScopeUseCaseWeights() {
return Collections.unmodifiableMap(managedMemoryOperatorScopeUseCaseWeights);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class ExecutionOptions {
.build());

@Documentation.ExcludeFromDocumentation(
"This is an expert option, that we do not want to expose in" + " the documentation")
"This is an expert option, that we do not want to expose in the documentation")
public static final ConfigOption<Boolean> SORT_INPUTS =
ConfigOptions.key("execution.sorted-inputs.enabled")
.booleanType()
Expand All @@ -109,7 +109,21 @@ public class ExecutionOptions {
+ "NOTE: It takes effect only in the BATCH runtime mode.");

@Documentation.ExcludeFromDocumentation(
"This is an expert option, that we do not want to expose in" + " the documentation")
"This is an expert option, that we do not want to expose in the documentation")
public static final ConfigOption<MemorySize> SORTED_INPUTS_MEMORY =
ConfigOptions.key("execution.sorted-inputs.memory")
.memoryType()
// in sync with other weights from Table API and DataStream API
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription(
"Sets the managed memory size for sorting inputs of keyed operators in "
+ "BATCH runtime mode. The memory size is only a weight hint. "
+ "Thus, it will affect the operator's memory weight within a "
+ "task, but the actual memory used depends on the running "
+ "environment.");

@Documentation.ExcludeFromDocumentation(
"This is an expert option, that we do not want to expose in the documentation")
public static final ConfigOption<Boolean> USE_BATCH_STATE_BACKEND =
ConfigOptions.key("execution.batch-state-backend.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
/** Use cases of managed memory. */
@Internal
public enum ManagedMemoryUseCase {

/** Currently, weights are defined as mebibyte values. */
OPERATOR(Scope.OPERATOR),

STATE_BACKEND(Scope.SLOT),

PYTHON(Scope.SLOT);

public final Scope scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.translators;

import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamNode;
Expand Down Expand Up @@ -61,12 +62,18 @@ static void applyBatchExecutionSettings(
node.addInputRequirement(i, inputRequirements[i]);
}
Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = new HashMap<>();
operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.OPERATOR, 1);
operatorScopeUseCaseWeights.put(
ManagedMemoryUseCase.OPERATOR,
deriveMemoryWeight(context.getGraphGeneratorConfig()));
node.setManagedMemoryUseCaseWeights(
operatorScopeUseCaseWeights, Collections.emptySet());
}
}

private static int deriveMemoryWeight(ReadableConfig configuration) {
return Math.max(1, configuration.get(ExecutionOptions.SORTED_INPUTS_MEMORY).getMebiBytes());
}

@SuppressWarnings("rawtypes")
private static boolean isInputSelectable(StreamNode node) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -66,7 +68,9 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -117,6 +121,66 @@ public void testBatchJobType() {
assertThat(graph.getJobType(), is(JobType.BATCH));
}

@Test
public void testManagedMemoryWeights() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> process =
env.fromElements(1, 2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>());

StreamGraphGenerator graphGenerator =
new StreamGraphGenerator(
Collections.singletonList(sink.getTransformation()),
env.getConfig(),
env.getCheckpointConfig());
graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);

StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());

final Map<ManagedMemoryUseCase, Integer> expectedOperatorWeights = new HashMap<>();
expectedOperatorWeights.put(
ManagedMemoryUseCase.OPERATOR,
ExecutionOptions.SORTED_INPUTS_MEMORY.defaultValue().getMebiBytes());
assertThat(
processNode.getManagedMemoryOperatorScopeUseCaseWeights(),
equalTo(expectedOperatorWeights));
assertThat(
processNode.getManagedMemorySlotScopeUseCases(),
equalTo(Collections.singleton(ManagedMemoryUseCase.STATE_BACKEND)));
}

@Test
public void testCustomManagedMemoryWeights() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> process =
env.fromElements(1, 2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>());

final Configuration configuration = new Configuration();
configuration.set(ExecutionOptions.SORTED_INPUTS_MEMORY, MemorySize.ofMebiBytes(42));

StreamGraphGenerator graphGenerator =
new StreamGraphGenerator(
Collections.singletonList(sink.getTransformation()),
env.getConfig(),
env.getCheckpointConfig(),
configuration);
graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);

StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());

final Map<ManagedMemoryUseCase, Integer> expectedOperatorWeights = new HashMap<>();
expectedOperatorWeights.put(ManagedMemoryUseCase.OPERATOR, 42);
assertThat(
processNode.getManagedMemoryOperatorScopeUseCaseWeights(),
equalTo(expectedOperatorWeights));
assertThat(
processNode.getManagedMemorySlotScopeUseCases(),
equalTo(Collections.singleton(ManagedMemoryUseCase.STATE_BACKEND)));
}

@Test
public void testOneInputTransformation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ public class ExecutionConfigOptions {
public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY =
key("table.exec.resource.hash-join.memory")
.memoryType()
.defaultValue(MemorySize.parse("128 mb"))
// in sync with other weights from Table API and DataStream API
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription(
"Sets the managed memory for hash join operator. It defines the lower"
+ " limit. Note: memory size is only a weight hint, it will affect the weight of"
Expand All @@ -247,7 +248,8 @@ public class ExecutionConfigOptions {
public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_SORT_MEMORY =
key("table.exec.resource.sort.memory")
.memoryType()
.defaultValue(MemorySize.parse("128 mb"))
// in sync with other weights from Table API and DataStream API
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription(
"Sets the managed buffer memory size for sort operator. Note: memory"
+ " size is only a weight hint, it will affect the weight of memory that can be"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// set resources
multipleInputTransform.setResources(
generator.getMinResources(), generator.getPreferredResources());
long memoryKB = generator.getManagedMemoryWeight();
ExecNodeUtil.setManagedMemoryWeight(multipleInputTransform, memoryKB * 1024L);
final int memoryWeight = generator.getManagedMemoryWeight();
final long memoryBytes = (long) memoryWeight << 20;
ExecNodeUtil.setManagedMemoryWeight(multipleInputTransform, memoryBytes);

// set chaining strategy for source chaining
multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@
/** An Utility class that helps translating {@link ExecNode} to {@link Transformation}. */
public class ExecNodeUtil {
/**
* Set memoryBytes to {@link
* Transformation#declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase, int)}.
* Sets {Transformation#declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase, int)}
* using the given bytes for {@link ManagedMemoryUseCase#OPERATOR}.
*/
public static <T> void setManagedMemoryWeight(
Transformation<T> transformation, long memoryBytes) {
// Using Bytes can easily overflow
// Using KibiBytes to cast to int
// Careful about zero
if (memoryBytes > 0) {
int memoryKibiBytes = (int) Math.max(1, (memoryBytes >> 10));
Optional<Integer> previousWeight =
final int weightInMebibyte = Math.max(1, (int) (memoryBytes >> 20));
final Optional<Integer> previousWeight =
transformation.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR, memoryKibiBytes);
ManagedMemoryUseCase.OPERATOR, weightInMebibyte);
if (previousWeight.isPresent()) {
throw new TableException(
"Managed memory weight has been set, this should not happen.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public class TableOperatorWrapperGenerator {
private int maxParallelism;
private ResourceSpec minResources;
private ResourceSpec preferredResources;
/** managed memory weight for batch operator. */
private long managedMemoryWeight;

/** Managed memory weight for batch operator in mebibyte. */
private int managedMemoryWeight;

public TableOperatorWrapperGenerator(
List<Transformation<?>> inputTransforms, Transformation<?> tailTransform) {
Expand Down Expand Up @@ -139,7 +140,7 @@ public ResourceSpec getPreferredResources() {
return preferredResources;
}

public long getManagedMemoryWeight() {
public int getManagedMemoryWeight() {
return managedMemoryWeight;
}

Expand Down

0 comments on commit 3e62364

Please sign in to comment.