Skip to content

Commit

Permalink
[FLINK-13601][tests] Harden RegionFailoverITCase
Browse files Browse the repository at this point in the history
Use CompletedCheckpointStore to record completed checkpoints, since it's reliable than notifications returned to the task.
  • Loading branch information
Myasuka authored and zentol committed Oct 18, 2019
1 parent bf473a4 commit db436d9
Showing 1 changed file with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand Down Expand Up @@ -63,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,6 +113,8 @@ public class RegionFailoverITCase extends TestLogger {
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());

// global failover times: 3, region failover times: NUM_OF_RESTARTS
configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3);
configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, FailingRestartStrategy.class.getName());
Expand Down Expand Up @@ -189,7 +200,8 @@ private JobGraph createJobGraph() {
}

private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<Tuple2<Integer, Integer>>
implements CheckpointListener, CheckpointedFunction {
implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private final long numElements;
Expand Down Expand Up @@ -259,15 +271,6 @@ public void cancel() {
isRunning = false;
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
if (getRuntimeContext().getIndexOfThisSubtask() == NUM_OF_REGIONS - 1) {
lastCompletedCheckpointId.set(checkpointId);
snapshotIndicesOfSubTask.put(checkpointId, lastRegionIndex);
numCompletedCheckpoints.incrementAndGet();
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
Expand All @@ -276,6 +279,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
listState.add(index);
if (indexOfThisSubtask == NUM_OF_REGIONS - 1) {
lastRegionIndex = index;
snapshotIndicesOfSubTask.put(context.getCheckpointId(), lastRegionIndex);
}
}
unionListState.clear();
Expand Down Expand Up @@ -403,4 +407,51 @@ public void restoreState(List<HashMap<Integer, Integer>> state) throws Exception
private static class TestException extends IOException{
private static final long serialVersionUID = 1L;
}

private static class TestingHaServices extends EmbeddedHaServices {
private final CheckpointRecoveryFactory checkpointRecoveryFactory;

TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) {
super(executor);
this.checkpointRecoveryFactory = checkpointRecoveryFactory;
}

@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return checkpointRecoveryFactory;
}
}

/**
* An extension of {@link StandaloneCompletedCheckpointStore} which would record information
* of last completed checkpoint id and the number of completed checkpoints.
*/
private static class TestingCompletedCheckpointStore extends StandaloneCompletedCheckpointStore {

TestingCompletedCheckpointStore() {
super(1);
}

@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
super.addCheckpoint(checkpoint);
// we record the information when adding completed checkpoint instead of 'notifyCheckpointComplete' invoked
// on task side to avoid race condition. See FLINK-13601.
lastCompletedCheckpointId.set(checkpoint.getCheckpointID());
numCompletedCheckpoints.incrementAndGet();
}
}

/**
* Testing HA factory which needs to be public in order to be instantiatable.
*/
public static class TestingHAFactory implements HighAvailabilityServicesFactory {

@Override
public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
return new TestingHaServices(
new TestingCheckpointRecoveryFactory(new TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
executor);
}
}
}

0 comments on commit db436d9

Please sign in to comment.