From f9c23a0b86121d6361df403a05f75ba4b3902735 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Tue, 21 Apr 2020 15:59:27 +0200 Subject: [PATCH] [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipelinedRegionFailoverStrategy Avoid re-computing pipelined regions in the RestartPipelinedRegionFailoverStrategy using the PipelinedRegionComputeUtil. Instead, rely on the pipelined regions provided by the Topology. This closes #11857. --- .../failover/flip1/FailoverRegion.java | 68 ---------- ...estartPipelinedRegionFailoverStrategy.java | 63 +++------ ...nedRegionFailoverStrategyBuildingTest.java | 125 +++++++++--------- 3 files changed, 78 insertions(+), 178 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java deleted file mode 100644 index d1efb6f5cf51f..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph.failover.flip1; - -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; - -import java.util.HashSet; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * FailoverRegion is a subset of all the vertices in the job topology. - */ -public class FailoverRegion { - - /** All vertex IDs in this region. */ - private final Set executionVertexIDs; - - /** All vertices in this region. */ - private final Set executionVertices; - - /** - * Creates a new failover region containing a set of vertices. - * - * @param executionVertices to be contained in this region - */ - public FailoverRegion(Set executionVertices) { - this.executionVertices = checkNotNull(executionVertices); - this.executionVertexIDs = new HashSet<>(); - executionVertices.forEach(v -> this.executionVertexIDs.add(v.getId())); - } - - /** - * Returns IDs of all vertices in this region. - * - * @return IDs of all vertices in this region - */ - public Set getAllExecutionVertexIDs() { - return executionVertexIDs; - } - - /** - * Returns all vertices in this region. - * - * @return all vertices in this region - */ - public Set getAllExecutionVertices() { - return executionVertices; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java index 3c158f0d1d1a1..eb8d06b12b15f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.util.ExceptionUtils; @@ -31,10 +32,8 @@ import java.util.ArrayDeque; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; @@ -53,12 +52,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy /** The topology containing info about all the vertices and result partitions. */ private final SchedulingTopology topology; - /** All failover regions. */ - private final Set regions; - - /** Maps execution vertex id to failover region. */ - private final Map vertexToRegionMap; - /** The checker helps to query result partition availability. */ private final RegionFailoverResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker; @@ -84,34 +77,8 @@ public RestartPipelinedRegionFailoverStrategy( ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { this.topology = checkNotNull(topology); - this.regions = Collections.newSetFromMap(new IdentityHashMap<>()); - this.vertexToRegionMap = new HashMap<>(); this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker( resultPartitionAvailabilityChecker); - - // build regions based on the given topology - LOG.info("Start building failover regions."); - buildFailoverRegions(); - } - // ------------------------------------------------------------------------ - // region building - // ------------------------------------------------------------------------ - - private void buildFailoverRegions() { - final Set> distinctRegions = - PipelinedRegionComputeUtil.computePipelinedRegions(topology); - - // creating all the failover regions and register them - for (Set regionVertices : distinctRegions) { - LOG.debug("Creating a failover region with {} vertices.", regionVertices.size()); - final FailoverRegion failoverRegion = new FailoverRegion(regionVertices); - regions.add(failoverRegion); - for (SchedulingExecutionVertex vertex : regionVertices) { - vertexToRegionMap.put(vertex.getId(), failoverRegion); - } - } - - LOG.info("Created {} failover regions.", regions.size()); } @@ -136,7 +103,7 @@ private void buildFailoverRegions() { public Set getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) { LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId); - final FailoverRegion failedRegion = vertexToRegionMap.get(executionVertexId); + final SchedulingPipelinedRegion failedRegion = topology.getPipelinedRegionOfVertex(executionVertexId); if (failedRegion == null) { // TODO: show the task name in the log throw new IllegalStateException("Can not find the failover region for task " + executionVertexId, cause); @@ -153,8 +120,8 @@ public Set getTasksNeedingRestart(ExecutionVertexID execution // calculate the tasks to restart based on the result of regions to restart Set tasksToRestart = new HashSet<>(); - for (FailoverRegion region : getRegionsToRestart(failedRegion)) { - tasksToRestart.addAll(region.getAllExecutionVertexIDs()); + for (SchedulingPipelinedRegion region : getRegionsToRestart(failedRegion)) { + region.getVertices().forEach(vertex -> tasksToRestart.add(vertex.getId())); } // the previous failed partition will be recovered. remove its failed state from the checker @@ -175,25 +142,25 @@ public Set getTasksNeedingRestart(ExecutionVertexID execution * the region containing the partition producer task is involved * 3. If a region is involved, all of its consumer regions are involved */ - private Set getRegionsToRestart(FailoverRegion failedRegion) { - Set regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>()); - Set visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>()); + private Set getRegionsToRestart(SchedulingPipelinedRegion failedRegion) { + Set regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>()); + Set visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>()); // start from the failed region to visit all involved regions - Queue regionsToVisit = new ArrayDeque<>(); + Queue regionsToVisit = new ArrayDeque<>(); visitedRegions.add(failedRegion); regionsToVisit.add(failedRegion); while (!regionsToVisit.isEmpty()) { - FailoverRegion regionToRestart = regionsToVisit.poll(); + SchedulingPipelinedRegion regionToRestart = regionsToVisit.poll(); // an involved region should be restarted regionsToRestart.add(regionToRestart); // if a needed input result partition is not available, its producer region is involved - for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) { + for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) { for (SchedulingResultPartition consumedPartition : vertex.getConsumedResults()) { if (!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) { - FailoverRegion producerRegion = vertexToRegionMap.get(consumedPartition.getProducer().getId()); + SchedulingPipelinedRegion producerRegion = topology.getPipelinedRegionOfVertex(consumedPartition.getProducer().getId()); if (!visitedRegions.contains(producerRegion)) { visitedRegions.add(producerRegion); regionsToVisit.add(producerRegion); @@ -203,10 +170,10 @@ private Set getRegionsToRestart(FailoverRegion failedRegion) { } // all consumer regions of an involved region should be involved - for (SchedulingExecutionVertex vertex : regionToRestart.getAllExecutionVertices()) { + for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) { for (SchedulingResultPartition producedPartition : vertex.getProducedResults()) { for (SchedulingExecutionVertex consumerVertex : producedPartition.getConsumers()) { - FailoverRegion consumerRegion = vertexToRegionMap.get(consumerVertex.getId()); + SchedulingPipelinedRegion consumerRegion = topology.getPipelinedRegionOfVertex(consumerVertex.getId()); if (!visitedRegions.contains(consumerRegion)) { visitedRegions.add(consumerRegion); regionsToVisit.add(consumerRegion); @@ -229,8 +196,8 @@ private Set getRegionsToRestart(FailoverRegion failedRegion) { * @return the failover region that contains the given execution vertex */ @VisibleForTesting - public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) { - return vertexToRegionMap.get(vertexID); + public SchedulingPipelinedRegion getFailoverRegion(ExecutionVertexID vertexID) { + return topology.getPipelinedRegionOfVertex(vertexID); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java index a9cdcffb89700..d3fa17e984358 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyBuildingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; import org.apache.flink.util.TestLogger; @@ -55,9 +56,9 @@ public void testIndividualVertices() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion r1 = strategy.getFailoverRegion(v1.getId()); - FailoverRegion r2 = strategy.getFailoverRegion(v2.getId()); - FailoverRegion r3 = strategy.getFailoverRegion(v3.getId()); + SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId()); + SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId()); + SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId()); assertDistinctRegions(r1, r2, r3); } @@ -91,12 +92,12 @@ public void testEmbarrassinglyParallelCase() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion ra3 = strategy.getFailoverRegion(va3.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); - FailoverRegion rb3 = strategy.getFailoverRegion(vb3.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion ra3 = strategy.getFailoverRegion(va3.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion rb3 = strategy.getFailoverRegion(vb3.getId()); assertSameRegion(ra1, rb1); assertSameRegion(ra2, rb2); @@ -138,12 +139,12 @@ public void testOneComponentViaTwoExchanges() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); - FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId()); - FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId()); + SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId()); assertSameRegion(ra1, ra2, rb1, rb2, rc1, rc2); } @@ -184,13 +185,13 @@ public void testOneComponentViaCascadeOfJoins() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion r1 = strategy.getFailoverRegion(v1.getId()); - FailoverRegion r2 = strategy.getFailoverRegion(v2.getId()); - FailoverRegion r3 = strategy.getFailoverRegion(v3.getId()); - FailoverRegion r4 = strategy.getFailoverRegion(v4.getId()); - FailoverRegion r5 = strategy.getFailoverRegion(v5.getId()); - FailoverRegion r6 = strategy.getFailoverRegion(v6.getId()); - FailoverRegion r7 = strategy.getFailoverRegion(v7.getId()); + SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId()); + SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId()); + SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId()); + SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId()); + SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId()); + SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId()); + SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId()); assertSameRegion(r1, r2, r3, r4, r5, r6, r7); } @@ -231,13 +232,13 @@ public void testOneComponentInstanceFromOneSource() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion r1 = strategy.getFailoverRegion(v1.getId()); - FailoverRegion r2 = strategy.getFailoverRegion(v2.getId()); - FailoverRegion r3 = strategy.getFailoverRegion(v3.getId()); - FailoverRegion r4 = strategy.getFailoverRegion(v4.getId()); - FailoverRegion r5 = strategy.getFailoverRegion(v5.getId()); - FailoverRegion r6 = strategy.getFailoverRegion(v6.getId()); - FailoverRegion r7 = strategy.getFailoverRegion(v7.getId()); + SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId()); + SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId()); + SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId()); + SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId()); + SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId()); + SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId()); + SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId()); assertSameRegion(r1, r2, r3, r4, r5, r6, r7); } @@ -275,12 +276,12 @@ public void testTwoComponentsViaBlockingExchange() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); - FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId()); - FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId()); + SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId()); assertSameRegion(ra1, ra2, rb1, rb2); @@ -322,12 +323,12 @@ public void testTwoComponentsViaBlockingExchange2() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); - FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getId()); - FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion rc1 = strategy.getFailoverRegion(vc1.getId()); + SchedulingPipelinedRegion rc2 = strategy.getFailoverRegion(vc2.getId()); assertSameRegion(ra1, ra2, rb1, rb2); @@ -374,13 +375,13 @@ public void testMultipleComponentsViaCascadeOfJoins() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion r1 = strategy.getFailoverRegion(v1.getId()); - FailoverRegion r2 = strategy.getFailoverRegion(v2.getId()); - FailoverRegion r3 = strategy.getFailoverRegion(v3.getId()); - FailoverRegion r4 = strategy.getFailoverRegion(v4.getId()); - FailoverRegion r5 = strategy.getFailoverRegion(v5.getId()); - FailoverRegion r6 = strategy.getFailoverRegion(v6.getId()); - FailoverRegion r7 = strategy.getFailoverRegion(v7.getId()); + SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId()); + SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId()); + SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId()); + SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId()); + SchedulingPipelinedRegion r5 = strategy.getFailoverRegion(v5.getId()); + SchedulingPipelinedRegion r6 = strategy.getFailoverRegion(v6.getId()); + SchedulingPipelinedRegion r7 = strategy.getFailoverRegion(v7.getId()); assertSameRegion(r1, r2, r5); assertSameRegion(r3, r4, r6); @@ -418,10 +419,10 @@ public void testDiamondWithMixedPipelinedAndBlockingExchanges() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion r1 = strategy.getFailoverRegion(v1.getId()); - FailoverRegion r2 = strategy.getFailoverRegion(v2.getId()); - FailoverRegion r3 = strategy.getFailoverRegion(v3.getId()); - FailoverRegion r4 = strategy.getFailoverRegion(v4.getId()); + SchedulingPipelinedRegion r1 = strategy.getFailoverRegion(v1.getId()); + SchedulingPipelinedRegion r2 = strategy.getFailoverRegion(v2.getId()); + SchedulingPipelinedRegion r3 = strategy.getFailoverRegion(v3.getId()); + SchedulingPipelinedRegion r4 = strategy.getFailoverRegion(v4.getId()); assertSameRegion(r1, r2, r3, r4); } @@ -459,10 +460,10 @@ public void testBlockingAllToAllTopologyWithCoLocation() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); assertSameRegion(ra1, ra2, rb1, rb2); } @@ -494,10 +495,10 @@ public void testPipelinedOneToOneTopologyWithCoLocation() { RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology); - FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId()); - FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId()); - FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getId()); - FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getId()); + SchedulingPipelinedRegion ra1 = strategy.getFailoverRegion(va1.getId()); + SchedulingPipelinedRegion ra2 = strategy.getFailoverRegion(va2.getId()); + SchedulingPipelinedRegion rb1 = strategy.getFailoverRegion(vb1.getId()); + SchedulingPipelinedRegion rb2 = strategy.getFailoverRegion(vb2.getId()); assertSameRegion(ra1, ra2, rb1, rb2); } @@ -506,7 +507,7 @@ public void testPipelinedOneToOneTopologyWithCoLocation() { // utilities // ------------------------------------------------------------------------ - public static void assertSameRegion(FailoverRegion ...regions) { + public static void assertSameRegion(SchedulingPipelinedRegion ...regions) { checkNotNull(regions); for (int i = 0; i < regions.length; i++) { for (int j = i + 1; i < regions.length; i++) { @@ -515,7 +516,7 @@ public static void assertSameRegion(FailoverRegion ...regions) { } } - public static void assertDistinctRegions(FailoverRegion ...regions) { + public static void assertDistinctRegions(SchedulingPipelinedRegion ...regions) { checkNotNull(regions); for (int i = 0; i < regions.length; i++) { for (int j = i + 1; j < regions.length; j++) {