Skip to content

Commit

Permalink
[FLINK-19640] Enable sorting inputs for batch
Browse files Browse the repository at this point in the history
This PR adds feature flags for enabling/disabling the sorting inputs and
special types of a state backend and a timer service for BATCH execution
runtime. Those options are enabled by default for BATCH runtime
execution mode.
  • Loading branch information
dawidwys committed Oct 16, 2020
1 parent daa54cb commit 2ff3b77
Show file tree
Hide file tree
Showing 11 changed files with 705 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

Expand Down Expand Up @@ -55,4 +56,25 @@ public class ExecutionOptions {
"throughput")
)
.build());

@Documentation.ExcludeFromDocumentation("This is an expert option, that we do not want to expose in" +
" the documentation")
public static final ConfigOption<Boolean> SORT_INPUTS =
ConfigOptions.key("execution.sorted-inputs.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"A flag to enable or disable sorting inputs of keyed operators. " +
"NOTE: It takes effect only in the BATCH runtime mode.");

@Documentation.ExcludeFromDocumentation("This is an expert option, that we do not want to expose in" +
" the documentation")
public static final ConfigOption<Boolean> USE_BATCH_STATE_BACKEND =
ConfigOptions.key("execution.batch-state-backend.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"A flag to enable or disable batch runtime specific state backend and timer service for keyed" +
" operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" +
SORT_INPUTS.key() + " to be enabled.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
});
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(
sortInputs -> this.getConfiguration().set(ExecutionOptions.SORT_INPUTS, sortInputs)
);
configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(
sortInputs -> this.getConfiguration().set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs)
);
}

private void registerCustomListeners(final ClassLoader classLoader, final List<String> listeners) {
Expand Down Expand Up @@ -1915,7 +1921,7 @@ private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return new StreamGraphGenerator(transformations, config, checkpointCfg)
return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand All @@ -33,6 +36,8 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
Expand Down Expand Up @@ -113,6 +118,8 @@ public class StreamGraphGenerator {

private final CheckpointConfig checkpointConfig;

private final ReadableConfig configuration;

private StateBackend stateBackend;

private boolean chaining = true;
Expand Down Expand Up @@ -146,6 +153,7 @@ public class StreamGraphGenerator {

// This is used to assign a unique ID to iteration source/sink
protected static Integer iterationIdCounter = 0;

public static int getNewIterationNodeId() {
iterationIdCounter--;
return iterationIdCounter;
Expand All @@ -161,9 +169,23 @@ public StreamGraphGenerator(
final List<Transformation<?>> transformations,
final ExecutionConfig executionConfig,
final CheckpointConfig checkpointConfig) {
this(
transformations,
executionConfig,
checkpointConfig,
new Configuration()
);
}

public StreamGraphGenerator(
List<Transformation<?>> transformations,
ExecutionConfig executionConfig,
CheckpointConfig checkpointConfig,
ReadableConfig configuration) {
this.transformations = checkNotNull(transformations);
this.executionConfig = checkNotNull(executionConfig);
this.checkpointConfig = checkNotNull(checkpointConfig);
this.configuration = checkNotNull(configuration);
}

public StreamGraphGenerator setRuntimeExecutionMode(final RuntimeExecutionMode runtimeExecutionMode) {
Expand Down Expand Up @@ -228,7 +250,6 @@ public StreamGraph generate() {
private void configureStreamGraph(final StreamGraph graph) {
checkNotNull(graph);

graph.setStateBackend(stateBackend);
graph.setChaining(chaining);
graph.setUserArtifacts(userArtifacts);
graph.setTimeCharacteristic(timeCharacteristic);
Expand All @@ -239,13 +260,31 @@ private void configureStreamGraph(final StreamGraph graph) {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
setDefaultBufferTimeout(-1);
setBatchStateBackendAndTimerService(graph);
} else {
graph.setStateBackend(stateBackend);
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.EAGER);
}
}

private void setBatchStateBackendAndTimerService(StreamGraph graph) {
boolean useStateBackend = configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
boolean sortInputs = configuration.get(ExecutionOptions.SORT_INPUTS);
checkState(
!useStateBackend || sortInputs,
"Batch state backend requires the sorted inputs to be enabled!");

if (useStateBackend) {
LOG.debug("Using BATCH execution state backend and timer service.");
graph.setStateBackend(new BatchExecutionStateBackend());
graph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);
} else {
graph.setStateBackend(stateBackend);
}
}

private boolean shouldExecuteInBatchMode(final RuntimeExecutionMode configuredMode) {
if (checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {
return configuredMode == RuntimeExecutionMode.BATCH;
Expand Down Expand Up @@ -691,7 +730,7 @@ private Collection<Integer> translate(
.collect(Collectors.toList()));

final TransformationTranslator.Context context = new ContextImpl(
this, streamGraph, slotSharingGroup);
this, streamGraph, slotSharingGroup, configuration);

return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
Expand Down Expand Up @@ -756,13 +795,17 @@ private static class ContextImpl implements TransformationTranslator.Context {

private final String slotSharingGroup;

private final ReadableConfig config;

public ContextImpl(
final StreamGraphGenerator streamGraphGenerator,
final StreamGraph streamGraph,
final String slotSharingGroup) {
final String slotSharingGroup,
final ReadableConfig config) {
this.streamGraphGenerator = checkNotNull(streamGraphGenerator);
this.streamGraph = checkNotNull(streamGraph);
this.slotSharingGroup = checkNotNull(slotSharingGroup);
this.config = checkNotNull(config);
}

@Override
Expand All @@ -787,5 +830,10 @@ public String getSlotSharingGroup() {
public long getDefaultBufferTimeout() {
return streamGraphGenerator.defaultBufferTimeout;
}

@Override
public ReadableConfig getGraphGeneratorConfig() {
return config;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,11 @@ public void setUserHash(String userHash) {
this.userHash = userHash;
}

@VisibleForTesting
public void setSortedInputs(boolean sortedInputs) {
this.sortedInputs = sortedInputs;
}

boolean getSortedInputs() {
public boolean getSortedInputs() {
return sortedInputs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;

import java.util.Collection;

Expand Down Expand Up @@ -88,5 +89,10 @@ interface Context {
* Returns the default buffer timeout to be used.
*/
long getDefaultBufferTimeout();

/**
* Retrieves additional configuration for the graph generation process.
*/
ReadableConfig getGraphGeneratorConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import java.io.Serializable;

/**
* An entity keeping all the time-related services.
*
Expand Down Expand Up @@ -68,7 +70,7 @@ void snapshotState(
* Allows substituting the manager that will be used at the runtime.
*/
@FunctionalInterface
interface Provider {
interface Provider extends Serializable {
<K> InternalTimeServiceManager<K> create(
CheckpointableKeyedStateBackend<K> keyedStatedBackend,
ClassLoader userClassloader,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.translators;

import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A utility class for applying sorting inputs.
*/
class BatchExecutionUtils {
private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class);

static void applySortingInputs(
int transformationId,
TransformationTranslator.Context context) {
StreamNode node = context.getStreamGraph().getStreamNode(transformationId);
boolean sortInputs = context.getGraphGeneratorConfig().get(ExecutionOptions.SORT_INPUTS);
boolean isInputSelectable = isInputSelectable(node);

adjustChainingStrategy(node);

checkState(
!isInputSelectable || !sortInputs,
"Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator."
);

if (sortInputs) {
LOG.debug("Enabling sorting inputs for an operator {}.", node);
node.setSortedInputs(true);
Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = new HashMap<>();
operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1);
node.setManagedMemoryUseCaseWeights(
operatorScopeUseCaseWeights,
Collections.emptySet()
);
}
}

@SuppressWarnings("rawtypes")
private static boolean isInputSelectable(StreamNode node) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<? extends StreamOperator> operatorClass = node.getOperatorFactory()
.getStreamOperatorClass(classLoader);
return InputSelectable.class.isAssignableFrom(operatorClass);
}

private static void adjustChainingStrategy(StreamNode node) {
StreamOperatorFactory<?> operatorFactory = node.getOperatorFactory();
ChainingStrategy currentChainingStrategy = operatorFactory.getChainingStrategy();
switch (currentChainingStrategy) {
case ALWAYS:
case HEAD_WITH_SOURCES:
LOG.debug(
"Setting chaining strategy to HEAD for operator {}, because of the BATCH execution mode.",
node);
operatorFactory.setChainingStrategy(ChainingStrategy.HEAD);
break;
case NEVER:
case HEAD:
break;
}
}

private BatchExecutionUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public class MultiInputTransformationTranslator<OUT>
protected Collection<Integer> translateForBatchInternal(
final AbstractMultipleInputTransformation<OUT> transformation,
final Context context) {
return translateInternal(transformation, context);
Collection<Integer> ids = translateInternal(transformation, context);
boolean isKeyed = transformation instanceof KeyedMultipleInputTransformation;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
}
return ids;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ public class OneInputTransformationTranslator<IN, OUT> extends SimpleTransformat
public Collection<Integer> translateForBatchInternal(
final OneInputTransformation<IN, OUT> transformation,
final Context context) {
return translateInternal(transformation, context);
Collection<Integer> ids = translateInternal(transformation, context);
boolean isKeyed = transformation.getStateKeySelector() != null;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
}

return ids;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public class TwoInputTransformationTranslator<IN1, IN2, OUT>
protected Collection<Integer> translateForBatchInternal(
final TwoInputTransformation<IN1, IN2, OUT> transformation,
final Context context) {
return translateInternal(transformation, context);
Collection<Integer> ids = translateInternal(transformation, context);
boolean isKeyed =
transformation.getStateKeySelector1() != null && transformation.getStateKeySelector2() != null;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
}
return ids;
}

@Override
Expand Down
Loading

0 comments on commit 2ff3b77

Please sign in to comment.