Skip to content

Commit

Permalink
[FLINK-17180][runtime] Use SchedulingPipelinedRegion in RegionPartiti…
Browse files Browse the repository at this point in the history
…onReleaseStrategy

Avoid re-computing pipelined regions in the
RegionPartitionReleaseStrategy using the PipelinedRegionComputeUtil.
Instead, rely on the pipelined regions provided by the Topology.
  • Loading branch information
GJL committed Apr 24, 2020
1 parent 23c13bb commit 95b3c95
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.topology.BaseTopology;
import org.apache.flink.runtime.topology.Result;
import org.apache.flink.runtime.topology.Vertex;
Expand All @@ -34,8 +31,6 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Utility for computing pipelined regions.
Expand All @@ -44,21 +39,6 @@ public final class PipelinedRegionComputeUtil {

private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);

public static Set<PipelinedRegion> toPipelinedRegionsSet(
final Set<? extends Set<? extends SchedulingExecutionVertex>> distinctRegions) {

return distinctRegions.stream()
.map(toExecutionVertexIdSet())
.map(PipelinedRegion::from)
.collect(Collectors.toSet());
}

private static Function<Set<? extends SchedulingExecutionVertex>, Set<ExecutionVertexID>> toExecutionVertexIdSet() {
return failoverVertices -> failoverVertices.stream()
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet());
}

public static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>> Set<Set<V>> computePipelinedRegions(
final BaseTopology<?, ?, V, R> topology) {

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,49 @@
package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.util.IterableUtils;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Provides a virtual execution state of a {@link PipelinedRegion}.
* Provides a virtual execution state of a {@link SchedulingPipelinedRegion}.
*
* <p>A pipelined region can be either finished or unfinished. It is finished iff. all its
* executions have reached the finished state.
*/
class PipelinedRegionExecutionView {

private final PipelinedRegion pipelinedRegion;
private final SchedulingPipelinedRegion pipelinedRegion;

private final Set<ExecutionVertexID> unfinishedVertices;

PipelinedRegionExecutionView(final PipelinedRegion pipelinedRegion) {
PipelinedRegionExecutionView(final SchedulingPipelinedRegion pipelinedRegion) {
this.pipelinedRegion = checkNotNull(pipelinedRegion);
this.unfinishedVertices = new HashSet<>(pipelinedRegion.getExecutionVertexIds());
this.unfinishedVertices = IterableUtils.toStream(pipelinedRegion.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet());
}

public boolean isFinished() {
return unfinishedVertices.isEmpty();
}

public void vertexFinished(final ExecutionVertexID executionVertexId) {
checkArgument(pipelinedRegion.contains(executionVertexId));
assertVertexInRegion(executionVertexId);
unfinishedVertices.remove(executionVertexId);
}

public void vertexUnfinished(final ExecutionVertexID executionVertexId) {
checkArgument(pipelinedRegion.contains(executionVertexId));
assertVertexInRegion(executionVertexId);
unfinishedVertices.add(executionVertexId);
}

public PipelinedRegion getPipelinedRegion() {
return pipelinedRegion;
private void assertVertexInRegion(final ExecutionVertexID executionVertexId) {
pipelinedRegion.getVertex(executionVertexId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,105 +19,56 @@

package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Releases blocking intermediate result partitions that are incident to a {@link PipelinedRegion},
* Releases blocking intermediate result partitions that are incident to a {@link SchedulingPipelinedRegion},
* as soon as the region's execution vertices are finished.
*/
public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy {

private final SchedulingTopology schedulingTopology;

private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<>();

private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<>();

public RegionPartitionReleaseStrategy(
final SchedulingTopology schedulingTopology,
final Set<PipelinedRegion> pipelinedRegions) {

public RegionPartitionReleaseStrategy(final SchedulingTopology schedulingTopology) {
this.schedulingTopology = checkNotNull(schedulingTopology);

checkNotNull(pipelinedRegions);
initConsumedBlockingPartitionsByRegion(pipelinedRegions);
initRegionExecutionViewByVertex(pipelinedRegions);
}

private void initConsumedBlockingPartitionsByRegion(final Set<PipelinedRegion> pipelinedRegions) {
for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
final PipelinedRegionConsumedBlockingPartitions consumedPartitions = computeConsumedPartitionsOfVertexRegion(pipelinedRegion);
consumedBlockingPartitionsByRegion.put(pipelinedRegion, consumedPartitions);
}
initRegionExecutionViewByVertex();
}

private void initRegionExecutionViewByVertex(final Set<PipelinedRegion> pipelinedRegions) {
for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
private void initRegionExecutionViewByVertex() {
for (SchedulingPipelinedRegion pipelinedRegion : schedulingTopology.getAllPipelinedRegions()) {
final PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
for (ExecutionVertexID executionVertexId : pipelinedRegion) {
regionExecutionViewByVertex.put(executionVertexId, regionExecutionView);
for (SchedulingExecutionVertex executionVertexId : pipelinedRegion.getVertices()) {
regionExecutionViewByVertex.put(executionVertexId.getId(), regionExecutionView);
}
}
}

private PipelinedRegionConsumedBlockingPartitions computeConsumedPartitionsOfVertexRegion(final PipelinedRegion pipelinedRegion) {
final Set<IntermediateResultPartitionID> resultPartitionsOutsideOfRegion = findResultPartitionsOutsideOfRegion(pipelinedRegion);
return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion);
}

private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
final Set<SchedulingResultPartition> allConsumedPartitionsInRegion = pipelinedRegion
.getExecutionVertexIds()
.stream()
.map(schedulingTopology::getVertex)
.flatMap(vertex -> IterableUtils.toStream(vertex.getConsumedResults()))
.collect(Collectors.toSet());

return filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion);
}

private static Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion(
final Collection<SchedulingResultPartition> resultPartitions,
final PipelinedRegion pipelinedRegion) {

final Set<IntermediateResultPartitionID> result = new HashSet<>();
for (final SchedulingResultPartition maybeOutsidePartition : resultPartitions) {
final SchedulingExecutionVertex producer = maybeOutsidePartition.getProducer();
if (!pipelinedRegion.contains(producer.getId())) {
result.add(maybeOutsidePartition.getId());
}
}
return result;
}

@Override
public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(finishedVertex);
regionExecutionView.vertexFinished(finishedVertex);

if (regionExecutionView.isFinished()) {
final PipelinedRegion pipelinedRegion = getPipelinedRegionForVertex(finishedVertex);
final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion = getConsumedBlockingPartitionsForRegion(pipelinedRegion);
return filterReleasablePartitions(consumedPartitionsOfVertexRegion);
final SchedulingPipelinedRegion pipelinedRegion = schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
return filterReleasablePartitions(pipelinedRegion.getConsumedResults());
}
return Collections.emptyList();
}
Expand All @@ -135,23 +86,9 @@ private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(fi
return pipelinedRegionExecutionView;
}

private PipelinedRegion getPipelinedRegionForVertex(final ExecutionVertexID executionVertexId) {
final PipelinedRegionExecutionView pipelinedRegionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId);
return pipelinedRegionExecutionView.getPipelinedRegion();
}

private PipelinedRegionConsumedBlockingPartitions getConsumedBlockingPartitionsForRegion(final PipelinedRegion pipelinedRegion) {
final PipelinedRegionConsumedBlockingPartitions pipelinedRegionConsumedBlockingPartitions = consumedBlockingPartitionsByRegion.get(pipelinedRegion);
checkState(pipelinedRegionConsumedBlockingPartitions != null,
"Consumed partitions not found for pipelined region %s", pipelinedRegion);
checkState(pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == pipelinedRegion);
return pipelinedRegionConsumedBlockingPartitions;
}

private List<IntermediateResultPartitionID> filterReleasablePartitions(final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion) {
return consumedPartitionsOfVertexRegion
.getConsumedBlockingPartitions()
.stream()
private List<IntermediateResultPartitionID> filterReleasablePartitions(final Iterable<? extends SchedulingResultPartition> schedulingResultPartitions) {
return IterableUtils.toStream(schedulingResultPartitions)
.map(SchedulingResultPartition::getId)
.filter(this::areConsumerRegionsFinished)
.collect(Collectors.toList());
}
Expand All @@ -175,13 +112,7 @@ public static class Factory implements PartitionReleaseStrategy.Factory {

@Override
public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy) {

final Set<? extends Set<SchedulingExecutionVertex>> distinctRegions =
PipelinedRegionComputeUtil.computePipelinedRegions(schedulingStrategy);

return new RegionPartitionReleaseStrategy(
schedulingStrategy,
PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions));
return new RegionPartitionReleaseStrategy(schedulingStrategy);
}
}
}
Loading

0 comments on commit 95b3c95

Please sign in to comment.