From db436d93797ef1422444ff4d92f966865c264cc3 Mon Sep 17 00:00:00 2001 From: Yun Tang Date: Fri, 18 Oct 2019 20:45:50 +0800 Subject: [PATCH] [FLINK-13601][tests] Harden RegionFailoverITCase Use CompletedCheckpointStore to record completed checkpoints, since it's reliable than notifications returned to the task. --- .../checkpointing/RegionFailoverITCase.java | 73 ++++++++++++++++--- 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 1e72ee37768c9..181743a25d37d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -29,11 +29,19 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -63,6 +71,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -104,6 +113,8 @@ public class RegionFailoverITCase extends TestLogger { public void setup() throws Exception { Configuration configuration = new Configuration(); configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); + configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName()); + // global failover times: 3, region failover times: NUM_OF_RESTARTS configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3); configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, FailingRestartStrategy.class.getName()); @@ -189,7 +200,8 @@ private JobGraph createJobGraph() { } private static class StringGeneratingSourceFunction extends RichParallelSourceFunction> - implements CheckpointListener, CheckpointedFunction { + implements CheckpointedFunction { + private static final long serialVersionUID = 1L; private final long numElements; @@ -259,15 +271,6 @@ public void cancel() { isRunning = false; } - @Override - public void notifyCheckpointComplete(long checkpointId) { - if (getRuntimeContext().getIndexOfThisSubtask() == NUM_OF_REGIONS - 1) { - lastCompletedCheckpointId.set(checkpointId); - snapshotIndicesOfSubTask.put(checkpointId, lastRegionIndex); - numCompletedCheckpoints.incrementAndGet(); - } - } - @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); @@ -276,6 +279,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { listState.add(index); if (indexOfThisSubtask == NUM_OF_REGIONS - 1) { lastRegionIndex = index; + snapshotIndicesOfSubTask.put(context.getCheckpointId(), lastRegionIndex); } } unionListState.clear(); @@ -403,4 +407,51 @@ public void restoreState(List> state) throws Exception private static class TestException extends IOException{ private static final long serialVersionUID = 1L; } + + private static class TestingHaServices extends EmbeddedHaServices { + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + + TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) { + super(executor); + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + return checkpointRecoveryFactory; + } + } + + /** + * An extension of {@link StandaloneCompletedCheckpointStore} which would record information + * of last completed checkpoint id and the number of completed checkpoints. + */ + private static class TestingCompletedCheckpointStore extends StandaloneCompletedCheckpointStore { + + TestingCompletedCheckpointStore() { + super(1); + } + + @Override + public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { + super.addCheckpoint(checkpoint); + // we record the information when adding completed checkpoint instead of 'notifyCheckpointComplete' invoked + // on task side to avoid race condition. See FLINK-13601. + lastCompletedCheckpointId.set(checkpoint.getCheckpointID()); + numCompletedCheckpoints.incrementAndGet(); + } + } + + /** + * Testing HA factory which needs to be public in order to be instantiatable. + */ + public static class TestingHAFactory implements HighAvailabilityServicesFactory { + + @Override + public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) { + return new TestingHaServices( + new TestingCheckpointRecoveryFactory(new TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()), + executor); + } + } }