Skip to content

Commit

Permalink
[FLINK-30233] Speculative scheduler supports hybrid result partition.
Browse files Browse the repository at this point in the history
This closes apache#21419
  • Loading branch information
reswqa authored and xintongsong committed Dec 22, 2022
1 parent ca8f5cf commit a56a4d3
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
Expand All @@ -56,6 +57,10 @@
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.DefaultInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
Expand Down Expand Up @@ -102,8 +107,7 @@ public SchedulerNG createInstance(
checkState(
jobGraph.getJobType() == JobType.BATCH,
"Adaptive batch scheduler only supports batch jobs");
checkAllExchangesBlocking(jobGraph);

checkAllExchangesAreSupported(jobGraph);
final SlotPool slotPool =
slotPoolService
.castInto(SlotPool.class)
Expand Down Expand Up @@ -155,6 +159,10 @@ public SchedulerNG createInstance(
true,
createExecutionJobVertexFactory(enableSpeculativeExecution));

final SchedulingStrategyFactory schedulingStrategyFactory =
new VertexwiseSchedulingStrategy.Factory(
loadInputConsumableDeciderFactory(hybridOnlyConsumeFinishedPartition));

if (enableSpeculativeExecution) {
return new SpeculativeScheduler(
log,
Expand All @@ -167,7 +175,7 @@ public SchedulerNG createInstance(
new CheckpointsCleaner(),
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
new VertexwiseSchedulingStrategy.Factory(),
schedulingStrategyFactory,
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
jobMasterConfiguration),
restartBackoffTimeStrategy,
Expand Down Expand Up @@ -197,7 +205,7 @@ public SchedulerNG createInstance(
new CheckpointsCleaner(),
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
new VertexwiseSchedulingStrategy.Factory(),
schedulingStrategyFactory,
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
jobMasterConfiguration),
restartBackoffTimeStrategy,
Expand All @@ -217,6 +225,18 @@ public SchedulerNG createInstance(
}
}

public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory(
boolean hybridOnlyConsumeFinishedPartition) {
// In the case of hybridOnlyConsumeFinishedPartition is true, the input can actually be
// considered as consumable after partial partitions are finished. The reason why the
// AllFinishedInputConsumableDecider is currently adopted here is that consume partial
// finished upstream task is not supported yet, which will be supported in the subsequent
// commits.
return hybridOnlyConsumeFinishedPartition
? AllFinishedInputConsumableDecider.Factory.INSTANCE
: DefaultInputConsumableDecider.Factory.INSTANCE;
}

private boolean getHybridOnlyConsumeFinishedPartition(
Configuration configuration, boolean enableSpeculativeExecution) {
final boolean hybridOnlyConsumeFinishedPartition =
Expand Down Expand Up @@ -251,17 +271,21 @@ private static ExecutionJobVertex.Factory createExecutionJobVertexFactory(
}
}

private void checkAllExchangesBlocking(final JobGraph jobGraph) {
private void checkAllExchangesAreSupported(final JobGraph jobGraph) {
for (JobVertex jobVertex : jobGraph.getVertices()) {
for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
checkState(
dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition(),
dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition()
|| dataSet.getResultType() == ResultPartitionType.HYBRID_FULL
|| dataSet.getResultType() == ResultPartitionType.HYBRID_SELECTIVE,
String.format(
"At the moment, adaptive batch scheduler requires batch workloads "
+ "to be executed with types of all edges being BLOCKING. "
+ "To do that, you need to configure '%s' to '%s'.",
+ "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. "
+ "To do that, you need to configure '%s' to '%s' or '%s/%s'.",
ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* <p>For canBePipelined consumed partition group: whether all result partitions in the group are
* scheduled.
*/
class DefaultInputConsumableDecider implements InputConsumableDecider {
public class DefaultInputConsumableDecider implements InputConsumableDecider {
private final Function<IntermediateResultPartitionID, SchedulingResultPartition>
resultPartitionRetriever;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,18 @@ private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToSch

/** The factory for creating {@link VertexwiseSchedulingStrategy}. */
public static class Factory implements SchedulingStrategyFactory {
private final InputConsumableDecider.Factory inputConsumableDeciderFactory;

public Factory(InputConsumableDecider.Factory inputConsumableDeciderFactory) {
this.inputConsumableDeciderFactory = inputConsumableDeciderFactory;
}

@Override
public SchedulingStrategy createInstance(
final SchedulerOperations schedulerOperations,
final SchedulingTopology schedulingTopology) {
return new VertexwiseSchedulingStrategy(
schedulerOperations,
schedulingTopology,
DefaultInputConsumableDecider.Factory.INSTANCE);
schedulerOperations, schedulingTopology, inputConsumableDeciderFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.DefaultInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
Expand Down Expand Up @@ -294,7 +296,8 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception
checkpointCleaner,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
new VertexwiseSchedulingStrategy.Factory(),
new VertexwiseSchedulingStrategy.Factory(
DefaultInputConsumableDecider.Factory.INSTANCE),
failoverStrategyFactory,
restartBackoffTimeStrategy,
executionOperations,
Expand Down Expand Up @@ -323,7 +326,8 @@ public SpeculativeScheduler buildSpeculativeScheduler() throws Exception {
checkpointCleaner,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
new VertexwiseSchedulingStrategy.Factory(),
new VertexwiseSchedulingStrategy.Factory(
AllFinishedInputConsumableDecider.Factory.INSTANCE),
failoverStrategyFactory,
restartBackoffTimeStrategy,
executionOperations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -71,6 +73,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
Expand Down Expand Up @@ -342,12 +345,20 @@ void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
}

@Test
void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception {
static Stream<ResultPartitionType> supportedResultPartitionType() {
return Stream.of(
ResultPartitionType.BLOCKING,
ResultPartitionType.HYBRID_FULL,
ResultPartitionType.HYBRID_SELECTIVE);
}

@ParameterizedTest
@MethodSource("supportedResultPartitionType")
void testSpeculativeExecutionCombinedWithAdaptiveScheduling(
ResultPartitionType resultPartitionType) throws Exception {
final JobVertex source = createNoOpVertex("source", 1);
final JobVertex sink = createNoOpVertex("sink", -1);
sink.connectNewDataSetAsInput(
source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);

final ComponentMainThreadExecutor mainThreadExecutor =
Expand Down

0 comments on commit a56a4d3

Please sign in to comment.