Skip to content

Commit

Permalink
[FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface
Browse files Browse the repository at this point in the history
Implement interfaces

  - Toplogy#getAllPipelinedRegions()
  - Topology#getPipelinedRegionOfVertex(ExecutionVertexID)

in DefaultExecutionTopology to enable retrieval of pipelined regions.
  • Loading branch information
GJL committed Apr 24, 2020
1 parent fd13c2c commit 23c13bb
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -41,6 +48,8 @@
*/
public class DefaultExecutionTopology implements SchedulingTopology {

private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionTopology.class);

private final boolean containsCoLocationConstraints;

private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById;
Expand All @@ -49,6 +58,10 @@ public class DefaultExecutionTopology implements SchedulingTopology {

private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById;

private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex;

private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions;

public DefaultExecutionTopology(ExecutionGraph graph) {
checkNotNull(graph, "execution graph can not be null");

Expand All @@ -74,6 +87,28 @@ public DefaultExecutionTopology(ExecutionGraph graph) {
this.resultPartitionsById = tmpResultPartitionsById;

connectVerticesToConsumedPartitions(executionVertexMap, tmpResultPartitionsById);

this.pipelinedRegionsByVertex = new HashMap<>();
this.pipelinedRegions = new ArrayList<>();
initializePipelinedRegions();
}

private void initializePipelinedRegions() {
final long buildRegionsStartTime = System.nanoTime();

final Set<Set<SchedulingExecutionVertex>> rawPipelinedRegions = PipelinedRegionComputeUtil.computePipelinedRegions(this);
for (Set<? extends SchedulingExecutionVertex> rawPipelinedRegion : rawPipelinedRegions) {
//noinspection unchecked
final DefaultSchedulingPipelinedRegion pipelinedRegion = new DefaultSchedulingPipelinedRegion((Set<DefaultExecutionVertex>) rawPipelinedRegion);
pipelinedRegions.add(pipelinedRegion);

for (SchedulingExecutionVertex executionVertex : rawPipelinedRegion) {
pipelinedRegionsByVertex.put(executionVertex.getId(), pipelinedRegion);
}
}

final long buildRegionsDuration = (System.nanoTime() - buildRegionsStartTime) / 1_000_000;
LOG.info("Built {} pipelined regions in {} ms", pipelinedRegions.size(), buildRegionsDuration);
}

@Override
Expand Down Expand Up @@ -104,6 +139,20 @@ public DefaultResultPartition getResultPartition(final IntermediateResultPartiti
return resultPartition;
}

@Override
public Iterable<DefaultSchedulingPipelinedRegion> getAllPipelinedRegions() {
return Collections.unmodifiableCollection(pipelinedRegions);
}

@Override
public DefaultSchedulingPipelinedRegion getPipelinedRegionOfVertex(final ExecutionVertexID vertexId) {
final DefaultSchedulingPipelinedRegion pipelinedRegion = pipelinedRegionsByVertex.get(vertexId);
if (pipelinedRegion == null) {
throw new IllegalArgumentException("Unknown execution vertex " + vertexId);
}
return pipelinedRegion;
}

private static List<DefaultResultPartition> generateProducedSchedulingResultPartition(
Map<IntermediateResultPartitionID, IntermediateResultPartition> producedIntermediatePartitions) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.scheduler.adapter;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.util.Preconditions;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Default implementation of {@link SchedulingPipelinedRegion}.
*/
public class DefaultSchedulingPipelinedRegion implements SchedulingPipelinedRegion {

private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVertices;

private Set<DefaultResultPartition> consumedResults;

public DefaultSchedulingPipelinedRegion(Set<DefaultExecutionVertex> defaultExecutionVertices) {
Preconditions.checkNotNull(defaultExecutionVertices);

this.executionVertices = new HashMap<>();
for (DefaultExecutionVertex executionVertex : defaultExecutionVertices) {
this.executionVertices.put(executionVertex.getId(), executionVertex);
}
}

@Override
public Iterable<DefaultExecutionVertex> getVertices() {
return Collections.unmodifiableCollection(executionVertices.values());
}

@Override
public DefaultExecutionVertex getVertex(final ExecutionVertexID vertexId) {
final DefaultExecutionVertex executionVertex = executionVertices.get(vertexId);
if (executionVertex == null) {
throw new IllegalArgumentException(String.format(
"Execution vertex %s not found in pipelined region",
vertexId));
}
return executionVertex;
}

@Override
public Iterable<DefaultResultPartition> getConsumedResults() {
if (consumedResults == null) {
initializeConsumedResults();
}
return consumedResults;
}

private void initializeConsumedResults() {
final Set<DefaultResultPartition> consumedResults = new HashSet<>();
for (DefaultExecutionVertex executionVertex : executionVertices.values()) {
for (DefaultResultPartition resultPartition : executionVertex.getConsumedResults()) {
if (!executionVertices.containsKey(resultPartition.getProducer().getId())) {
consumedResults.add(resultPartition);
}
}
}
this.consumedResults = Collections.unmodifiableSet(consumedResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,6 +45,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -154,6 +156,25 @@ public void testWithoutCoLocationConstraints() {
assertFalse(adapter.containsCoLocationConstraints());
}

@Test
public void testGetAllPipelinedRegions() {
final Iterable<DefaultSchedulingPipelinedRegion> allPipelinedRegions = adapter.getAllPipelinedRegions();
assertEquals(1, Iterables.size(allPipelinedRegions));
}

@Test
public void testGetPipelinedRegionOfVertex() {
for (DefaultExecutionVertex vertex : adapter.getVertices()) {
final DefaultSchedulingPipelinedRegion pipelinedRegion = adapter.getPipelinedRegionOfVertex(vertex.getId());
assertRegionContainsAllVertices(pipelinedRegion);
}
}

private void assertRegionContainsAllVertices(final DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) {
final Set<DefaultExecutionVertex> allVertices = Sets.newHashSet(pipelinedRegionOfVertex.getVertices());
assertEquals(Sets.newHashSet(adapter.getVertices()), allVertices);
}

private ExecutionGraph createExecutionGraphWithCoLocationConstraint() throws Exception {
JobVertex[] jobVertices = new JobVertex[2];
int parallelism = 3;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.scheduler.adapter;

import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;

import org.junit.Test;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Unit tests for {@link DefaultSchedulingPipelinedRegion}.
*/
public class DefaultSchedulingPipelinedRegionTest extends TestLogger {

@Test
public void gettingUnknownVertexThrowsException() {
final DefaultSchedulingPipelinedRegion pipelinedRegion = new DefaultSchedulingPipelinedRegion(Collections.emptySet());
final ExecutionVertexID unknownVertexId = new ExecutionVertexID(new JobVertexID(), 0);
try {
pipelinedRegion.getVertex(unknownVertexId);
fail("Expected exception not thrown");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString(unknownVertexId + " not found"));
}
}

@Test
public void returnsVertices() {
final DefaultExecutionVertex vertex = new DefaultExecutionVertex(
new ExecutionVertexID(new JobVertexID(), 0),
Collections.emptyList(),
() -> ExecutionState.CREATED,
InputDependencyConstraint.ANY);

final Set<DefaultExecutionVertex> vertices = Collections.singleton(vertex);
final DefaultSchedulingPipelinedRegion pipelinedRegion = new DefaultSchedulingPipelinedRegion(vertices);
final Iterator<DefaultExecutionVertex> vertexIterator = pipelinedRegion.getVertices().iterator();

assertThat(vertexIterator.hasNext(), is(true));
assertThat(vertexIterator.next(), is(sameInstance(vertex)));
assertThat(vertexIterator.hasNext(), is(false));
}

/**
* Tests if the consumed inputs of the pipelined regions are computed
* correctly using the Job graph below.
* <pre>
* c
* / X
* a -+- b e
* \ /
* d
* </pre>
* Pipelined regions: {a}, {b, c, d, e}
*/
@Test
public void returnsIncidentBlockingPartitions() throws Exception {
final JobVertex a = ExecutionGraphTestUtils.createNoOpVertex(1);
final JobVertex b = ExecutionGraphTestUtils.createNoOpVertex(1);
final JobVertex c = ExecutionGraphTestUtils.createNoOpVertex(1);
final JobVertex d = ExecutionGraphTestUtils.createNoOpVertex(1);
final JobVertex e = ExecutionGraphTestUtils.createNoOpVertex(1);

b.connectNewDataSetAsInput(a, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
c.connectNewDataSetAsInput(b, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
d.connectNewDataSetAsInput(b, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
e.connectNewDataSetAsInput(c, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
e.connectNewDataSetAsInput(d, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final ExecutionGraph simpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(a, b, c, d, e);
final DefaultExecutionTopology topology = new DefaultExecutionTopology(simpleTestGraph);

final DefaultSchedulingPipelinedRegion firstPipelinedRegion = topology.getPipelinedRegionOfVertex(new ExecutionVertexID(a.getID(), 0));
final DefaultSchedulingPipelinedRegion secondPipelinedRegion = topology.getPipelinedRegionOfVertex(new ExecutionVertexID(e.getID(), 0));

final DefaultExecutionVertex vertexB0 = topology.getVertex(new ExecutionVertexID(b.getID(), 0));
final IntermediateResultPartitionID b0ConsumedResultPartition = Iterables.getOnlyElement(vertexB0.getConsumedResults()).getId();

final Set<IntermediateResultPartitionID> secondPipelinedRegionConsumedResults = IterableUtils.toStream(secondPipelinedRegion.getConsumedResults())
.map(DefaultResultPartition::getId)
.collect(Collectors.toSet());

assertThat(firstPipelinedRegion.getConsumedResults().iterator().hasNext(), is(false));
assertThat(secondPipelinedRegionConsumedResults, contains(b0ConsumedResultPartition));
}
}

0 comments on commit 23c13bb

Please sign in to comment.