Skip to content

Commit

Permalink
[FLINK-17369][tests] In RestartPipelinedRegionFailoverStrategyBuildin…
Browse files Browse the repository at this point in the history
…gTest invoke PipelinedRegionComputeUtil directly

RestartPipelinedRegionFailoverStrategyBuildingTest means to test the
logic in PipelinedRegionComputeUtil. Currently, however, the pipelined
regions are retrieved indirectly from
RestartPipelinedRegionFailoverStrategy. This commit changes that by
using the PipelinedRegionComputeUtil directly from the test.
  • Loading branch information
GJL committed May 11, 2020
1 parent ce6b97e commit cba4845
Showing 1 changed file with 96 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
Expand Down Expand Up @@ -54,11 +59,11 @@ public void testIndividualVertices() {
TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId());
SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId());
SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId());
Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());

assertDistinctRegions(r1, r2, r3);
}
Expand Down Expand Up @@ -90,14 +95,14 @@ public void testEmbarrassinglyParallelCase() {
.connect(va2, vb2, ResultPartitionType.PIPELINED)
.connect(va3, vb3, ResultPartitionType.PIPELINED);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion ra3 = strategy.getFailoverRegion(va3.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
SchedulingPipelinedRegion rb3 = strategy.getFailoverRegion(vb3.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> ra3 = pipelinedRegionByVertex.get(va3.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
Set<SchedulingExecutionVertex> rb3 = pipelinedRegionByVertex.get(vb3.getId());

assertSameRegion(ra1, rb1);
assertSameRegion(ra2, rb2);
Expand Down Expand Up @@ -137,14 +142,14 @@ public void testOneComponentViaTwoExchanges() {
.connect(vb2, vc1, ResultPartitionType.PIPELINED)
.connect(vb2, vc2, ResultPartitionType.PIPELINED);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId());
SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());

assertSameRegion(ra1, ra2, rb1, rb2, rc1, rc2);
}
Expand Down Expand Up @@ -183,15 +188,15 @@ public void testOneComponentViaCascadeOfJoins() {
.connect(v5, v7, ResultPartitionType.PIPELINED)
.connect(v6, v7, ResultPartitionType.PIPELINED);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId());
SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId());
SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId());
SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId());
SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId());
SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId());
SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId());
Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());

assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
}
Expand Down Expand Up @@ -230,15 +235,15 @@ public void testOneComponentInstanceFromOneSource() {
.connect(v3, v6, ResultPartitionType.PIPELINED)
.connect(v3, v7, ResultPartitionType.PIPELINED);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId());
SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId());
SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId());
SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId());
SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId());
SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId());
SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId());
Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());

assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
}
Expand Down Expand Up @@ -274,14 +279,14 @@ public void testTwoComponentsViaBlockingExchange() {
.connect(vb1, vc1, ResultPartitionType.BLOCKING)
.connect(vb2, vc2, ResultPartitionType.BLOCKING);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId());
SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());

assertSameRegion(ra1, ra2, rb1, rb2);

Expand Down Expand Up @@ -321,14 +326,14 @@ public void testTwoComponentsViaBlockingExchange2() {
.connect(vb2, vc1, ResultPartitionType.BLOCKING)
.connect(vb2, vc2, ResultPartitionType.BLOCKING);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId());
SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());
Set<SchedulingExecutionVertex> rc1 = pipelinedRegionByVertex.get(vc1.getId());
Set<SchedulingExecutionVertex> rc2 = pipelinedRegionByVertex.get(vc2.getId());

assertSameRegion(ra1, ra2, rb1, rb2);

Expand Down Expand Up @@ -373,15 +378,15 @@ public void testMultipleComponentsViaCascadeOfJoins() {
.connect(v5, v7, ResultPartitionType.BLOCKING)
.connect(v6, v7, ResultPartitionType.BLOCKING);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId());
SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId());
SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId());
SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId());
SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId());
SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId());
SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId());
Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());
Set<SchedulingExecutionVertex> r5 = pipelinedRegionByVertex.get(v5.getId());
Set<SchedulingExecutionVertex> r6 = pipelinedRegionByVertex.get(v6.getId());
Set<SchedulingExecutionVertex> r7 = pipelinedRegionByVertex.get(v7.getId());

assertSameRegion(r1, r2, r5);
assertSameRegion(r3, r4, r6);
Expand Down Expand Up @@ -417,12 +422,12 @@ public void testDiamondWithMixedPipelinedAndBlockingExchanges() {
.connect(v2, v4, ResultPartitionType.PIPELINED)
.connect(v3, v4, ResultPartitionType.PIPELINED);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId());
SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId());
SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId());
SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId());
Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());

assertSameRegion(r1, r2, r3, r4);
}
Expand Down Expand Up @@ -458,12 +463,12 @@ public void testBlockingAllToAllTopologyWithCoLocation() {

topology.setContainsCoLocationConstraints(true);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());

assertSameRegion(ra1, ra2, rb1, rb2);
}
Expand Down Expand Up @@ -493,12 +498,12 @@ public void testPipelinedOneToOneTopologyWithCoLocation() {

topology.setContainsCoLocationConstraints(true);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);
Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId());
SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId());
SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId());
SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId());
Set<SchedulingExecutionVertex> ra1 = pipelinedRegionByVertex.get(va1.getId());
Set<SchedulingExecutionVertex> ra2 = pipelinedRegionByVertex.get(va2.getId());
Set<SchedulingExecutionVertex> rb1 = pipelinedRegionByVertex.get(vb1.getId());
Set<SchedulingExecutionVertex> rb2 = pipelinedRegionByVertex.get(vb2.getId());

assertSameRegion(ra1, ra2, rb1, rb2);
}
Expand All @@ -507,7 +512,23 @@ public void testPipelinedOneToOneTopologyWithCoLocation() {
// utilities
// ------------------------------------------------------------------------

public static void assertSameRegion(SchedulingPipelinedRegion ...regions) {
private static Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> computePipelinedRegionByVertex(final TestingSchedulingTopology topology) {
final Set<Set<SchedulingExecutionVertex>> regions = PipelinedRegionComputeUtil.computePipelinedRegions(topology);
return computePipelinedRegionByVertex(regions);
}

private static Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> computePipelinedRegionByVertex(final Set<Set<SchedulingExecutionVertex>> regions) {
final Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = new HashMap<>();
for (Set<SchedulingExecutionVertex> region : regions) {
for (SchedulingExecutionVertex vertex : region) {
pipelinedRegionByVertex.put(vertex.getId(), region);
}
}
return pipelinedRegionByVertex;
}

@SafeVarargs
public static void assertSameRegion(Set<SchedulingExecutionVertex>... regions) {
checkNotNull(regions);
for (int i = 0; i < regions.length; i++) {
for (int j = i + 1; i < regions.length; i++) {
Expand All @@ -516,7 +537,8 @@ public static void assertSameRegion(SchedulingPipelinedRegion ...regions) {
}
}

public static void assertDistinctRegions(SchedulingPipelinedRegion ...regions) {
@SafeVarargs
public static void assertDistinctRegions(Set<SchedulingExecutionVertex>... regions) {
checkNotNull(regions);
for (int i = 0; i < regions.length; i++) {
for (int j = i + 1; j < regions.length; j++) {
Expand Down

0 comments on commit cba4845

Please sign in to comment.