Skip to content

Commit

Permalink
[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 11, 2018
1 parent 0f3b3f9 commit 4c85b74
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void recover() throws Exception {

if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
throw new FlinkException(
"Could not read any of the " + numberOfInitialCheckpoints + " from storage.");
"Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage.");
} else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
LOG.warn(
"Could only fetch {} of {} checkpoints from storage.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
*/
public class ZooKeeperHighAvailabilityITCase extends TestLogger {

private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);

private static final int NUM_JMS = 1;
private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 1;

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static File haStorageDir;

private static TestingServer zkServer;

private static MiniClusterResource miniClusterResource;

private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
private static OneShotLatch failInCheckpointLatch = new OneShotLatch();

@BeforeClass
public static void setup() throws Exception {
zkServer = new TestingServer();

Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);

haStorageDir = TEMPORARY_FOLDER.newFolder();

config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
config,
NUM_TMS,
NUM_SLOTS_PER_TM),
true);

miniClusterResource.before();
}

@AfterClass
public static void tearDown() throws Exception {
miniClusterResource.after();

zkServer.stop();
zkServer.close();
}

/**
* Verify that we don't start a job from scratch if we cannot restore any of the
* CompletedCheckpoints.
*
* <p>Synchronization for the different steps and things we want to observe happens via
* latches in the test method and the methods of {@link CheckpointBlockingFunction}.
*
* <p>The test follows these steps:
* <ol>
* <li>Start job and block on a latch until we have done some checkpoints
* <li>Block in the special function
* <li>Move away the contents of the ZooKeeper HA directory to make restoring from
* checkpoints impossible
* <li>Unblock the special function, which now induces a failure
* <li>Make sure that the job does not recover successfully
* <li>Move back the HA directory
* <li>Make sure that the job recovers, we use a latch to ensure that the operator
* restored successfully
* </ol>
*/
@Test(timeout = 120_000L)
public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
CheckpointBlockingFunction.successfulRestores.set(0);
CheckpointBlockingFunction.illegalRestores.set(0);
CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
CheckpointBlockingFunction.failedAlready.set(false);

waitForCheckpointLatch = new OneShotLatch();
failInCheckpointLatch = new OneShotLatch();

ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms

File checkpointLocation = TEMPORARY_FOLDER.newFolder();
env.setStateBackend((StateBackend) new FsStateBackend(checkpointLocation.toURI()));

DataStreamSource<String> source = env.addSource(new UnboundedSource());

source
.keyBy((str) -> str)
.map(new CheckpointBlockingFunction());

JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());

clusterClient.setDetached(true);
clusterClient.submitJob(jobGraph, ZooKeeperHighAvailabilityITCase.class.getClassLoader());

// wait until we did some checkpoints
waitForCheckpointLatch.await();

// mess with the HA directory so that the job cannot restore
File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
int numCheckpoints = 0;
File[] files = haStorageDir.listFiles();
assertNotNull(files);
for (File file : files) {
if (file.getName().startsWith("completedCheckpoint")) {
assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
numCheckpoints++;
}
}
// Note to future developers: This will break when we change Flink to not put the
// checkpoint metadata into the HA directory but instead rely on the fact that the
// actual checkpoint directory on DFS contains the checkpoint metadata. In this case,
// ZooKeeper will only contain a "handle" (read: String) that points to the metadata
// in DFS. The likely solution will be that we have to go directly to ZooKeeper, find
// out where the checkpoint is stored and mess with that.
assertTrue(numCheckpoints > 0);

failInCheckpointLatch.trigger();

// Ensure that we see at least one cycle where the job tries to restart and fails.
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(1),
deadline,
(jobStatus) -> jobStatus == JobStatus.RESTARTING,
TestingUtils.defaultScheduledExecutor());
assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());

jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(1),
deadline,
(jobStatus) -> jobStatus == JobStatus.FAILING,
TestingUtils.defaultScheduledExecutor());
assertEquals(JobStatus.FAILING, jobStatusFuture.get());

// move back the HA directory so that the job can restore
CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);

files = movedCheckpointLocation.listFiles();
assertNotNull(files);
for (File file : files) {
if (file.getName().startsWith("completedCheckpoint")) {
assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
}
}

// now the job should be able to go to RUNNING again and then eventually to FINISHED,
// which it only does if it could successfully restore
jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus == JobStatus.FINISHED,
TestingUtils.defaultScheduledExecutor());
assertEquals(JobStatus.FINISHED, jobStatusFuture.get());

assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
}

private static class UnboundedSource implements SourceFunction<String> {
private volatile boolean running = true;

@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running && !CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
ctx.collect("hello");
// don't overdo it ... ;-)
Thread.sleep(50);
}
}

@Override
public void cancel() {
running = false;
}
}

private static class CheckpointBlockingFunction
extends RichMapFunction<String, String>
implements CheckpointedFunction {

// verify that we only call initializeState()
// once with isRestored() == false. All other invocations must have isRestored() == true. This
// verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't
// be read.
static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);

// we count when we see restores that are not allowed. We only
// allow restores once we messed with the HA directory and moved it back again
static AtomicInteger illegalRestores = new AtomicInteger(0);
static AtomicInteger successfulRestores = new AtomicInteger(0);

// whether we are after the phase where we messed with the ZooKeeper HA directory, i.e.
// whether it's now ok for a restore to happen
static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);

static AtomicBoolean failedAlready = new AtomicBoolean(false);

// also have some state to write to the checkpoint
private final ValueStateDescriptor<String> stateDescriptor =
new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);

@Override
public String map(String value) throws Exception {
getRuntimeContext().getState(stateDescriptor).update("42");
return value;
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (context.getCheckpointId() > 5) {
waitForCheckpointLatch.trigger();
failInCheckpointLatch.await();
if (!failedAlready.getAndSet(true)) {
throw new RuntimeException("Failing on purpose.");
}
}
}

@Override
public void initializeState(FunctionInitializationContext context) {
if (!context.isRestored()) {
int updatedValue = allowedInitializeCallsWithoutRestore.decrementAndGet();
if (updatedValue < 0) {
illegalRestores.getAndIncrement();
throw new RuntimeException("We are not allowed any more restores.");
}
} else {
if (!afterMessWithZooKeeper.get()) {
illegalRestores.getAndIncrement();
} else if (successfulRestores.getAndIncrement() > 0) {
// already saw the one allowed successful restore
illegalRestores.getAndIncrement();
}
}
}
}
}

0 comments on commit 4c85b74

Please sign in to comment.