Skip to content

Commit

Permalink
[FLINK-11156][tests, runtime] Simplify ZooKeeperCompletedCheckpointSt…
Browse files Browse the repository at this point in the history
…ore constructor
  • Loading branch information
GJL committed Jan 15, 2019
1 parent 02078e0 commit eff261e
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingConsumer;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -75,9 +70,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto

private static final Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> STRING_COMPARATOR = Comparator.comparing(o -> o.f1);

/** Curator ZooKeeper client. */
private final CuratorFramework client;

/** Completed checkpoints in ZooKeeper. */
private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;

Expand All @@ -100,79 +92,23 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* least 1). Adding more checkpoints than this results
* in older checkpoints being discarded. On recovery,
* we will only start with a single checkpoint.
* @param client The Curator ZooKeeper client
* @param checkpointsPath The ZooKeeper path for the checkpoints (needs to
* start with a '/')
* @param stateStorage State storage to be used to persist the completed
* checkpoint
* @param executor to execute blocking calls
* @throws Exception
* @param checkpointsInZooKeeper Completed checkpoints in ZooKeeper
* @param executor to execute blocking calls
*/
public ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
String checkpointsPath,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor
) throws Exception {
this(maxNumberOfCheckpointsToRetain,
adaptNameSpace(client, checkpointsPath),
stateStorage,
executor);

LOG.info("Initialized in '{}'.", checkpointsPath);
}

@VisibleForTesting
ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
String checkpointsPath,
Executor executor,
ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper
) throws Exception {
this(maxNumberOfCheckpointsToRetain,
adaptNameSpace(client, checkpointsPath),
executor,
checkpointsInZooKeeper);

LOG.info("Initialized in '{}'.", checkpointsPath);
}

private ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor
) {
this(maxNumberOfCheckpointsToRetain,
client,
executor,
new ZooKeeperStateHandleStore<>(client, stateStorage));
}
int maxNumberOfCheckpointsToRetain,
ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper,
Executor executor) {

private ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
@Nonnull CuratorFramework client,
@Nonnull Executor executor,
@Nonnull ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper
) {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");

this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.client = client;
this.executor = executor;
this.checkpointsInZooKeeper = checkpointsInZooKeeper;
this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
}

private static CuratorFramework adaptNameSpace(CuratorFramework client, String checkpointsPath) throws Exception {
// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(checkpointsPath)
.ensure(client.getZookeeperClient());
this.checkpointsInZooKeeper = checkNotNull(checkpointsInZooKeeper);

this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);

// All operations will have the path as root
return client.usingNamespace(client.getNamespace() + checkpointsPath);
this.executor = checkNotNull(executor);
}

@Override
Expand Down Expand Up @@ -345,11 +281,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {
}

completedCheckpoints.clear();

String path = "/" + client.getNamespace();

LOG.info("Removing {} from ZooKeeper", path);
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
checkpointsInZooKeeper.deleteChildren();
} else {
LOG.info("Suspending");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,31 @@ public static CompletedCheckpointStore createCompletedCheckpoints(

checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);

return new ZooKeeperCompletedCheckpointStore(
final ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
client,
checkpointsPath,
stateStorage,
createZooKeeperStateHandleStore(client, checkpointsPath, stateStorage),
executor);

LOG.info("Initialized {} in '{}'.", ZooKeeperCompletedCheckpointStore.class.getSimpleName(), checkpointsPath);
return zooKeeperCompletedCheckpointStore;
}

/**
* Creates an instance of {@link ZooKeeperStateHandleStore}.
*
* @param client ZK client
* @param path Path to use for the client namespace
* @param stateStorage RetrievableStateStorageHelper that persist the actual state and whose
* returned state handle is then written to ZooKeeper
* @param <T> Type of state
* @return {@link ZooKeeperStateHandleStore} instance
* @throws Exception ZK errors
*/
public static <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
final CuratorFramework client,
final String path,
final RetrievableStateStorageHelper<T> stateStorage) throws Exception {
return new ZooKeeperStateHandleStore<>(useNamespaceAndEnsurePath(client, path), stateStorage);
}

/**
Expand Down Expand Up @@ -362,6 +381,27 @@ public static String generateZookeeperPath(String root, String namespace) {
return root + namespace;
}

/**
* Returns a facade of the client that uses the specified namespace, and ensures that all nodes
* in the path exist.
*
* @param client ZK client
* @param path the new namespace
* @return ZK Client that uses the new namespace
* @throws Exception ZK errors
*/
public static CuratorFramework useNamespaceAndEnsurePath(final CuratorFramework client, final String path) throws Exception {
Preconditions.checkNotNull(client, "client must not be null");
Preconditions.checkNotNull(path, "path must not be null");

// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(path)
.ensure(client.getZookeeperClient());

// All operations will have the path as root
return client.usingNamespace(generateZookeeperPath(client.getNamespace(), path));
}

/**
* Secure {@link ACLProvider} implementation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.util.InstantiationUtil;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -424,6 +425,17 @@ public void releaseAll() throws Exception {
}
}

/**
* Recursively deletes all children.
*
* @throws Exception ZK errors
*/
public void deleteChildren() throws Exception {
final String path = "/" + client.getNamespace();
LOG.info("Removing {} from ZooKeeper", path);
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}

// ---------------------------------------------------------------------------------------------------------
// Protected methods
// ---------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,10 @@ public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStat
String zkStateHandleStorePath,
RetrievableStateStorageHelper<T> stateStorageHelper) throws Exception {

facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
ZooKeeperUtils.generateZookeeperPath(
facade.getNamespace(),
zkStateHandleStorePath));

return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper);
return ZooKeeperUtils.createZooKeeperStateHandleStore(
facade,
zkStateHandleStorePath,
stateStorageHelper);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;

import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -67,10 +69,14 @@ public void cleanUp() throws Exception {

@Override
protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception {
return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore(
ZOOKEEPER.getClient(),
CHECKPOINT_PATH,
new HeapStateStorageHelper(),
new TestingRetrievableStateStorageHelper<>());

return new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
checkpointsInZooKeeper,
Executors.directExecutor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}
});

final String checkpointsPath = "foobar";

ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
numCheckpointsToRetain,
client,
checkpointsPath,
Executors.directExecutor(),
zooKeeperStateHandleStoreMock);
zooKeeperStateHandleStoreMock,
Executors.directExecutor());

zooKeeperCompletedCheckpointStore.recover();

Expand Down Expand Up @@ -221,14 +217,11 @@ public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invoc
doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());

final int numCheckpointsToRetain = 1;
final String checkpointsPath = "foobar";

ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
numCheckpointsToRetain,
client,
checkpointsPath,
Executors.directExecutor(),
zookeeperStateHandleStoreMock);
zookeeperStateHandleStoreMock,
Executors.directExecutor());

for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;

import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -117,11 +118,14 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception {

@Nonnull
private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
return new ZooKeeperCompletedCheckpointStore(
1,
final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore(
client,
"/checkpoints",
new TestingRetrievableStateStorageHelper<>(),
new TestingRetrievableStateStorageHelper<>());

return new ZooKeeperCompletedCheckpointStore(
1,
checkpointsInZooKeeper,
Executors.directExecutor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@
import java.util.List;
import java.util.Set;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -685,6 +688,18 @@ public void testReleaseAll() throws Exception {
assertEquals(0, stat.getNumChildren());
}

@Test
public void testDeleteAllShouldRemoveAllPaths() throws Exception {
final ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
ZooKeeperUtils.useNamespaceAndEnsurePath(ZOOKEEPER.getClient(), "/path"),
new LongStateStorage());

zkStore.addAndLock("/state", 1L);
zkStore.deleteChildren();

assertThat(zkStore.getAllPaths(), is(empty()));
}

// ---------------------------------------------------------------------------------------------
// Simple test helpers
// ---------------------------------------------------------------------------------------------
Expand Down

0 comments on commit eff261e

Please sign in to comment.