Skip to content

Commit

Permalink
[FLINK-14091][tests] Refactor ZooKeeperCheckpointIDCounter for a more…
Browse files Browse the repository at this point in the history
… testable codebase
  • Loading branch information
tisonkun authored and tillrohrmann committed Jan 15, 2020
1 parent 25b1697 commit 7a0fa1e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.ArrayList;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -60,12 +66,14 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
/** Curator recipe for shared counts */
private final SharedCount sharedCount;

/** Connection state listener to monitor the client connection */
private final SharedCountConnectionStateListener connStateListener =
new SharedCountConnectionStateListener();
private final Collection<ConnectionStateListener> connectionStateListeners;

private final Object startStopLock = new Object();

@Nullable
private volatile ConnectionState lastState;

@GuardedBy("startStopLock")
private boolean isStarted;

/**
Expand All @@ -78,14 +86,26 @@ public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath)
this.client = checkNotNull(client, "Curator client");
this.counterPath = checkNotNull(counterPath, "Counter path");
this.sharedCount = new SharedCount(client, counterPath, 1);

this.connectionStateListeners = new ArrayList<>();
this.connectionStateListeners.add((ignore, newState) -> lastState = newState);
}

@VisibleForTesting
ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath, Collection<ConnectionStateListener> listeners) {
this(client, counterPath);
this.connectionStateListeners.addAll(listeners);
}

@Override
public void start() throws Exception {
synchronized (startStopLock) {
if (!isStarted) {
sharedCount.start();
client.getConnectionStateListenable().addListener(connStateListener);

for (ConnectionStateListener listener : connectionStateListeners) {
client.getConnectionStateListenable().addListener(listener);
}

isStarted = true;
}
Expand All @@ -98,7 +118,10 @@ public void shutdown(JobStatus jobStatus) throws Exception {
if (isStarted) {
LOG.info("Shutting down.");
sharedCount.close();
client.getConnectionStateListenable().removeListener(connStateListener);

for (ConnectionStateListener listener : connectionStateListeners) {
client.getConnectionStateListenable().removeListener(listener);
}

if (jobStatus.isGloballyTerminalState()) {
LOG.info("Removing {} from ZooKeeper", counterPath);
Expand All @@ -113,7 +136,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {
@Override
public long getAndIncrement() throws Exception {
while (true) {
connStateListener.checkConnectionState();
checkConnectionState();

VersionedValue<Integer> current = sharedCount.getVersionedValue();
int newCount = current.getValue() + 1;
Expand All @@ -132,14 +155,14 @@ public long getAndIncrement() throws Exception {

@Override
public long get() {
connStateListener.checkConnectionState();
checkConnectionState();

return sharedCount.getVersionedValue().getValue();
}

@Override
public void setCount(long newId) throws Exception {
connStateListener.checkConnectionState();
checkConnectionState();

if (newId > Integer.MAX_VALUE) {
throw new IllegalArgumentException("ZooKeeper checkpoint counter only supports " +
Expand All @@ -150,32 +173,15 @@ public void setCount(long newId) throws Exception {
sharedCount.setCount((int) newId);
}

@VisibleForTesting
ConnectionState getLastState() {
return connStateListener.lastState;
}

/**
* Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
* ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
*/
private static class SharedCountConnectionStateListener implements ConnectionStateListener {

private volatile ConnectionState lastState;
private void checkConnectionState() {
final ConnectionState currentLastState = this.lastState;

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
lastState = newState;
if (currentLastState == null) {
return;
}

private void checkConnectionState() {
if (lastState == null) {
return;
}

if (lastState != ConnectionState.CONNECTED && lastState != ConnectionState.RECONNECTED) {
throw new IllegalStateException("Connection state: " + lastState);
}
if (currentLastState != ConnectionState.CONNECTED && currentLastState != ConnectionState.RECONNECTED) {
throw new IllegalStateException("Connection state: " + currentLastState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,91 +18,85 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.TestingCluster;
import org.junit.AfterClass;
import org.junit.Before;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.junit.Rule;
import org.junit.Test;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link ZooKeeperCheckpointIDCounter} in a ZooKeeper ensemble.
*/
public final class ZKCheckpointIDCounterMultiServersTest extends TestLogger {

private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(3);

@AfterClass
public static void tearDown() throws Exception {
ZOOKEEPER.shutdown();
}

@Before
public void cleanUp() throws Exception {
ZOOKEEPER.deleteAll();
}
@Rule
public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

/**
* Tests that {@link ZooKeeperCheckpointIDCounter} can be recovered after a
* connection loss exception from ZooKeeper ensemble.
*
* See also FLINK-14091.
* <p>See also FLINK-14091.
*/
@Test
public void testRecoveredAfterConnectionLoss() throws Exception {
CuratorFramework client = ZOOKEEPER.getClient();

ZooKeeperCheckpointIDCounter idCounter = new ZooKeeperCheckpointIDCounter(client, "/checkpoint-id-counter");
idCounter.start();
final Configuration configuration = new Configuration();
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);

AtomicLong localCounter = new AtomicLong(1L);
try {
OneShotLatch connectionLossLatch = new OneShotLatch();
OneShotLatch reconnectedLatch = new OneShotLatch();

assertThat(
"ZooKeeperCheckpointIDCounter doesn't properly work.",
idCounter.getAndIncrement(),
is(localCounter.getAndIncrement()));
ConnectionStateListener listener = (ignore, newState) -> {
if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
connectionLossLatch.trigger();
}

TestingCluster cluster = ZOOKEEPER.getZooKeeperCluster();
assertThat(cluster, is(notNullValue()));
if (newState == ConnectionState.RECONNECTED) {
reconnectedLatch.trigger();
}
};

// close the server this client connected to, which triggers a connection loss exception
cluster.restartServer(cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()));
ZooKeeperCheckpointIDCounter idCounter = new ZooKeeperCheckpointIDCounter(
client,
"/checkpoint-id-counter",
Collections.singleton(listener));
idCounter.start();

// encountered connected loss, this prevents us from getting false positive
while (true) {
try {
idCounter.get();
} catch (IllegalStateException ignore) {
log.debug("Encountered connection loss.");
break;
}
}
AtomicLong localCounter = new AtomicLong(1L);

// recovered from connection loss
while (true) {
try {
long id = idCounter.get();
assertThat(id, is(localCounter.get()));
break;
} catch (IllegalStateException ignore) {
log.debug("During ZooKeeper client reconnecting...");
}
}
assertThat(
"ZooKeeperCheckpointIDCounter doesn't properly work.",
idCounter.getAndIncrement(),
is(localCounter.getAndIncrement()));

zooKeeperResource.restart();

assertThat(idCounter.getLastState(), is(ConnectionState.RECONNECTED));
assertThat(
"ZooKeeperCheckpointIDCounter doesn't properly work after reconnected.",
idCounter.getAndIncrement(),
is(localCounter.getAndIncrement()));
connectionLossLatch.await();
reconnectedLatch.await();

assertThat(
"ZooKeeperCheckpointIDCounter doesn't properly work after reconnected.",
idCounter.getAndIncrement(),
is(localCounter.getAndIncrement()));
} finally {
client.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ protected void after() {
LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
}
}

public void restart() throws Exception {
Preconditions.checkNotNull(zooKeeperServer);
zooKeeperServer.restart();
}
}

0 comments on commit 7a0fa1e

Please sign in to comment.