Skip to content

Commit

Permalink
[FLINK-11156][tests, runtime] Reconcile ZooKeeperCompletedCheckpointS…
Browse files Browse the repository at this point in the history
…toreMockitoTest with JDK 9
  • Loading branch information
tisonkun authored and GJL committed Jan 15, 2019
1 parent e7b02b4 commit 02078e0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
Expand All @@ -32,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -107,34 +109,70 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* @throws Exception
*/
public ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
String checkpointsPath,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor) throws Exception {
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
String checkpointsPath,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor
) throws Exception {
this(maxNumberOfCheckpointsToRetain,
adaptNameSpace(client, checkpointsPath),
stateStorage,
executor);

checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");

this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
LOG.info("Initialized in '{}'.", checkpointsPath);
}

checkNotNull(client, "Curator client");
checkNotNull(checkpointsPath, "Checkpoints path");
@VisibleForTesting
ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
String checkpointsPath,
Executor executor,
ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper
) throws Exception {
this(maxNumberOfCheckpointsToRetain,
adaptNameSpace(client, checkpointsPath),
executor,
checkpointsInZooKeeper);

// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(checkpointsPath)
.ensure(client.getZookeeperClient());
LOG.info("Initialized in '{}'.", checkpointsPath);
}

// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
private ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
CuratorFramework client,
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor
) {
this(maxNumberOfCheckpointsToRetain,
client,
executor,
new ZooKeeperStateHandleStore<>(client, stateStorage));
}

this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
private ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
@Nonnull CuratorFramework client,
@Nonnull Executor executor,
@Nonnull ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper
) {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");

this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
this.client = client;
this.executor = executor;
this.checkpointsInZooKeeper = checkpointsInZooKeeper;
this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
}

this.executor = checkNotNull(executor);
private static CuratorFramework adaptNameSpace(CuratorFramework client, String checkpointsPath) throws Exception {
// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(checkpointsPath)
.ensure(client.getZookeeperClient());

LOG.info("Initialized in '{}'.", checkpointsPath);
// All operations will have the path as root
return client.usingNamespace(client.getNamespace() + checkpointsPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@
import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.utils.EnsurePath;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -55,21 +52,18 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.whenNew;

/**
* Mockito based tests for the {@link ZooKeeperStateHandleStore}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {

/**
Expand Down Expand Up @@ -125,7 +119,6 @@ public void testCheckpointRecovery() throws Exception {
final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);

ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();

final int numCheckpointsToRetain = 1;
Expand Down Expand Up @@ -165,14 +158,13 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
});

final String checkpointsPath = "foobar";
final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);

ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
numCheckpointsToRetain,
client,
checkpointsPath,
stateStorage,
Executors.directExecutor());
Executors.directExecutor(),
zooKeeperStateHandleStoreMock);

zooKeeperCompletedCheckpointStore.recover();

Expand Down Expand Up @@ -213,7 +205,6 @@ public void testAddCheckpointWithFailedRemove() throws Exception {

ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);

doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@Override
Expand All @@ -231,14 +222,13 @@ public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invoc

final int numCheckpointsToRetain = 1;
final String checkpointsPath = "foobar";
final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);

ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
numCheckpointsToRetain,
client,
checkpointsPath,
stateSotrage,
Executors.directExecutor());
Executors.directExecutor(),
zookeeperStateHandleStoreMock);

for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
Expand Down

0 comments on commit 02078e0

Please sign in to comment.