Skip to content

Commit

Permalink
[FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipeli…
Browse files Browse the repository at this point in the history
…nedRegionFailoverStrategy

Avoid re-computing pipelined regions in the
RestartPipelinedRegionFailoverStrategy using the
PipelinedRegionComputeUtil. Instead, rely on the pipelined regions
provided by the Topology.

This closes apache#11857.
  • Loading branch information
GJL committed Apr 24, 2020
1 parent 95b3c95 commit f9c23a0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 178 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.ExceptionUtils;
Expand All @@ -31,10 +32,8 @@

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
Expand All @@ -53,12 +52,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
/** The topology containing info about all the vertices and result partitions. */
private final SchedulingTopology topology;

/** All failover regions. */
private final Set<FailoverRegion> regions;

/** Maps execution vertex id to failover region. */
private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;

/** The checker helps to query result partition availability. */
private final RegionFailoverResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;

Expand All @@ -84,34 +77,8 @@ public RestartPipelinedRegionFailoverStrategy(
ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {

this.topology = checkNotNull(topology);
this.regions = Collections.newSetFromMap(new IdentityHashMap<>());
this.vertexToRegionMap = new HashMap<>();
this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(
resultPartitionAvailabilityChecker);

// build regions based on the given topology
LOG.info("Start building failover regions.");
buildFailoverRegions();
}
// ------------------------------------------------------------------------
// region building
// ------------------------------------------------------------------------

private void buildFailoverRegions() {
final Set<Set<SchedulingExecutionVertex>> distinctRegions =
PipelinedRegionComputeUtil.computePipelinedRegions(topology);

// creating all the failover regions and register them
for (Set<SchedulingExecutionVertex> regionVertices : distinctRegions) {
LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
regions.add(failoverRegion);
for (SchedulingExecutionVertex vertex : regionVertices) {
vertexToRegionMap.put(vertex.getId(), failoverRegion);
}
}

LOG.info("Created {} failover regions.", regions.size());
}


Expand All @@ -136,7 +103,7 @@ private void buildFailoverRegions() {
public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId);

final FailoverRegion failedRegion = vertexToRegionMap.get(executionVertexId);
final SchedulingPipelinedRegion failedRegion = topology.getPipelinedRegionOfVertex(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the failover region for task " + executionVertexId, cause);
Expand All @@ -153,8 +120,8 @@ public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID execution

// calculate the tasks to restart based on the result of regions to restart
Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
for (FailoverRegion region : getRegionsToRestart(failedRegion)) {
tasksToRestart.addAll(region.getAllExecutionVertexIDs());
for (SchedulingPipelinedRegion region : getRegionsToRestart(failedRegion)) {
region.getVertices().forEach(vertex -> tasksToRestart.add(vertex.getId()));
}

// the previous failed partition will be recovered. remove its failed state from the checker
Expand All @@ -175,25 +142,25 @@ public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID execution
* the region containing the partition producer task is involved
* 3. If a region is involved, all of its consumer regions are involved
*/
private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());
private Set<SchedulingPipelinedRegion> getRegionsToRestart(SchedulingPipelinedRegion failedRegion) {
Set<SchedulingPipelinedRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
Set<SchedulingPipelinedRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());

// start from the failed region to visit all involved regions
Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
Queue<SchedulingPipelinedRegion> regionsToVisit = new ArrayDeque<>();
visitedRegions.add(failedRegion);
regionsToVisit.add(failedRegion);
while (!regionsToVisit.isEmpty()) {
FailoverRegion regionToRestart = regionsToVisit.poll();
SchedulingPipelinedRegion regionToRestart = regionsToVisit.poll();

// an involved region should be restarted
regionsToRestart.add(regionToRestart);

// if a needed input result partition is not available, its producer region is involved
for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) {
for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) {
for (SchedulingResultPartition consumedPartition : vertex.getConsumedResults()) {
if (!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) {
FailoverRegion producerRegion = vertexToRegionMap.get(consumedPartition.getProducer().getId());
SchedulingPipelinedRegion producerRegion = topology.getPipelinedRegionOfVertex(consumedPartition.getProducer().getId());
if (!visitedRegions.contains(producerRegion)) {
visitedRegions.add(producerRegion);
regionsToVisit.add(producerRegion);
Expand All @@ -203,10 +170,10 @@ private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
}

// all consumer regions of an involved region should be involved
for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) {
for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) {
for (SchedulingResultPartition producedPartition : vertex.getProducedResults()) {
for (SchedulingExecutionVertex consumerVertex : producedPartition.getConsumers()) {
FailoverRegion consumerRegion = vertexToRegionMap.get(consumerVertex.getId());
SchedulingPipelinedRegion consumerRegion = topology.getPipelinedRegionOfVertex(consumerVertex.getId());
if (!visitedRegions.contains(consumerRegion)) {
visitedRegions.add(consumerRegion);
regionsToVisit.add(consumerRegion);
Expand All @@ -229,8 +196,8 @@ private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
* @return the failover region that contains the given execution vertex
*/
@VisibleForTesting
public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
return vertexToRegionMap.get(vertexID);
public SchedulingPipelinedRegion getFailoverRegion(ExecutionVertexID vertexID) {
return topology.getPipelinedRegionOfVertex(vertexID);
}

/**
Expand Down
Loading

0 comments on commit f9c23a0

Please sign in to comment.