Skip to content

Commit

Permalink
[FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor
Browse files Browse the repository at this point in the history
Move initialization logic out of the ZooKeeperSubmittedJobGraphStore constructor.
  • Loading branch information
tillrohrmann committed Jan 9, 2019
1 parent f817537 commit 0315f20
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -69,9 +68,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
private final Object cacheLock = new Object();

/** Client (not a namespace facade). */
private final CuratorFramework client;

/** The set of IDs of all added job graphs. */
private final Set<JobID> addedJobGraphs = new HashSet<>();

Expand All @@ -96,34 +92,20 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/**
* Submitted job graph store backed by ZooKeeper.
*
* @param client ZooKeeper client
* @param currentJobsPath ZooKeeper path for current job graphs
* @param stateStorage State storage used to persist the submitted jobs
* @throws Exception
* @param zooKeeperFullBasePath ZooKeeper path for current job graphs
* @param zooKeeperStateHandleStore State storage used to persist the submitted jobs
*/
public ZooKeeperSubmittedJobGraphStore(
CuratorFramework client,
String currentJobsPath,
RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {

checkNotNull(currentJobsPath, "Current jobs path");
checkNotNull(stateStorage, "State storage");

// Keep a reference to the original client and not the namespace facade. The namespace
// facade cannot be closed.
this.client = checkNotNull(client, "Curator client");

// Ensure that the job graphs path exists
client.newNamespaceAwareEnsurePath(currentJobsPath)
.ensure(client.getZookeeperClient());
String zooKeeperFullBasePath,
ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore,
PathChildrenCache pathCache) {

// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
checkNotNull(zooKeeperFullBasePath, "Current jobs path");

this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
this.zooKeeperFullBasePath = zooKeeperFullBasePath;
this.jobGraphsInZooKeeper = checkNotNull(zooKeeperStateHandleStore);

this.pathCache = new PathChildrenCache(facade, "/", false);
this.pathCache = checkNotNull(pathCache);
pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.Preconditions;

Expand All @@ -41,6 +42,7 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -244,10 +246,23 @@ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);

// Ensure that the job graphs path exists
client.newNamespaceAwareEnsurePath(zooKeeperSubmittedJobsPath)
.ensure(client.getZookeeperClient());

// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + zooKeeperSubmittedJobsPath);

final String zooKeeperFullSubmittedJobsPath = client.getNamespace() + zooKeeperSubmittedJobsPath;

final ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(facade, stateStorage);

final PathChildrenCache pathCache = new PathChildrenCache(facade, "/", false);

return new ZooKeeperSubmittedJobGraphStore(
client,
zooKeeperSubmittedJobsPath,
stateStorage);
zooKeeperFullSubmittedJobsPath,
zooKeeperStateHandleStore,
pathCache);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -98,9 +100,9 @@ public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
@Nonnull
public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
return new ZooKeeperSubmittedJobGraphStore(
client,
"/foobar",
stateStorage);
client.getNamespace(),
new ZooKeeperStateHandleStore<>(client, stateStorage),
new PathChildrenCache(client, "/", false));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -59,9 +64,9 @@
*/
public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {

private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);

private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
private static final RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
@Override
public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException {
ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
Expand All @@ -71,7 +76,6 @@ public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state)
}
};


@AfterClass
public static void tearDown() throws Exception {
if (ZooKeeper != null) {
Expand All @@ -86,10 +90,7 @@ public void cleanUp() throws Exception {

@Test
public void testPutAndRemoveJobGraph() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(),
"/testPutAndRemoveJobGraph",
localStateStorage);
ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testPutAndRemoveJobGraph");

try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
Expand Down Expand Up @@ -142,10 +143,25 @@ public void testPutAndRemoveJobGraph() throws Exception {
}
}

@Nonnull
private ZooKeeperSubmittedJobGraphStore createZooKeeperSubmittedJobGraphStore(String fullPath) throws Exception {
final CuratorFramework client = ZooKeeper.getClient();
// Ensure that the job graphs path exists
client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());

// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
return new ZooKeeperSubmittedJobGraphStore(
fullPath,
new ZooKeeperStateHandleStore<>(
facade,
localStateStorage),
new PathChildrenCache(facade, "/", false));
}

@Test
public void testRecoverJobGraphs() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testRecoverJobGraphs");

try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
Expand Down Expand Up @@ -195,12 +211,9 @@ public void testConcurrentAddJobGraph() throws Exception {
ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;

try {
jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);

otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
jobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");

otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");

SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
SubmittedJobGraph otherJobGraph = createSubmittedJobGraph(new JobID(), 0);
Expand Down Expand Up @@ -254,11 +267,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

@Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");

ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
ZooKeeperSubmittedJobGraphStore otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");

jobGraphs.start(null);
otherJobGraphs.start(null);
Expand Down

0 comments on commit 0315f20

Please sign in to comment.