Skip to content

Commit

Permalink
[FLINK-24437][HA]Remove unhandled exception handler from CuratorFrame…
Browse files Browse the repository at this point in the history
…work before closing it
  • Loading branch information
Aitozi authored and tillrohrmann committed Oct 8, 2021
1 parent 58bd394 commit 43ff973
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -147,9 +145,9 @@ public static ClientHighAvailabilityServices createClientHAService(
configuration, AddressResolution.TRY_ADDRESS_RESOLUTION);
return new StandaloneClientHAServices(webMonitorAddress);
case ZOOKEEPER:
final CuratorFramework client =
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler);
return new ZooKeeperClientHAServices(client, configuration);
return new ZooKeeperClientHAServices(
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler),
configuration);
case FACTORY_CLASS:
return createCustomClientHAServices(configuration);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.highavailability.zookeeper;

import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;

import java.io.Closeable;

/**
* A wrapper for curatorFramework and unHandledErrorListener which should be unregister from
* curatorFramework before closing it.
*/
public class CuratorFrameworkWithUnhandledErrorListener implements Closeable {

private final CuratorFramework client;

private final UnhandledErrorListener listener;

public CuratorFrameworkWithUnhandledErrorListener(
CuratorFramework client, UnhandledErrorListener listener) {
this.client = Preconditions.checkNotNull(client);
this.listener = Preconditions.checkNotNull(listener);
}

@Override
public void close() {
client.getUnhandledErrorListenable().removeListener(listener);
client.close();
}

public CuratorFramework asCuratorFramework() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,34 @@
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

/** ZooKeeper based implementation for {@link ClientHighAvailabilityServices}. */
public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices {

private final CuratorFramework client;
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;

private final Configuration configuration;

public ZooKeeperClientHAServices(
@Nonnull CuratorFramework client, @Nonnull Configuration configuration) {
this.client = client;
this.configuration = configuration;
@Nonnull CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
@Nonnull Configuration configuration) {
this.curatorFrameworkWrapper = Preconditions.checkNotNull(curatorFrameworkWrapper);
this.configuration = Preconditions.checkNotNull(configuration);
}

@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(
client, ZooKeeperUtils.getLeaderPathForRestServer(), configuration);
curatorFrameworkWrapper.asCuratorFramework(),
ZooKeeperUtils.getLeaderPathForRestServer(),
configuration);
}

@Override
public void close() throws Exception {
client.close();
curatorFrameworkWrapper.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,49 +100,54 @@ public class ZooKeeperHaServices extends AbstractHaServices {

// ------------------------------------------------------------------------

/** The ZooKeeper client to use. */
private final CuratorFramework client;
/** The curator resource to use. */
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;

public ZooKeeperHaServices(
CuratorFramework client,
CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
Executor executor,
Configuration configuration,
BlobStoreService blobStoreService) {
super(configuration, executor, blobStoreService);
this.client = checkNotNull(client);
this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper);
}

@Override
public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception {
return new ZooKeeperCheckpointRecoveryFactory(
ZooKeeperUtils.useNamespaceAndEnsurePath(client, ZooKeeperUtils.getJobsPath()),
ZooKeeperUtils.useNamespaceAndEnsurePath(
curatorFrameworkWrapper.asCuratorFramework(), ZooKeeperUtils.getJobsPath()),
configuration,
ioExecutor);
}

@Override
public JobGraphStore createJobGraphStore() throws Exception {
return ZooKeeperUtils.createJobGraphs(client, configuration);
return ZooKeeperUtils.createJobGraphs(
curatorFrameworkWrapper.asCuratorFramework(), configuration);
}

@Override
public RunningJobsRegistry createRunningJobsRegistry() {
return new ZooKeeperRunningJobsRegistry(client, configuration);
return new ZooKeeperRunningJobsRegistry(
curatorFrameworkWrapper.asCuratorFramework(), configuration);
}

@Override
protected LeaderElectionService createLeaderElectionService(String leaderPath) {
return ZooKeeperUtils.createLeaderElectionService(client, leaderPath);
return ZooKeeperUtils.createLeaderElectionService(
curatorFrameworkWrapper.asCuratorFramework(), leaderPath);
}

@Override
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath, configuration);
return ZooKeeperUtils.createLeaderRetrievalService(
curatorFrameworkWrapper.asCuratorFramework(), leaderPath, configuration);
}

@Override
public void internalClose() {
client.close();
curatorFrameworkWrapper.close();
}

@Override
Expand Down Expand Up @@ -198,13 +203,17 @@ private void deleteZNode(String path) throws Exception {
// The retry logic can be removed once we upgrade to Curator version >= 4.0.1.
boolean zNodeDeleted = false;
while (!zNodeDeleted) {
Stat stat = client.checkExists().forPath(path);
Stat stat = curatorFrameworkWrapper.asCuratorFramework().checkExists().forPath(path);
if (stat == null) {
logger.debug("znode {} has been deleted", path);
return;
}
try {
client.delete().deletingChildrenIfNeeded().forPath(path);
curatorFrameworkWrapper
.asCuratorFramework()
.delete()
.deletingChildrenIfNeeded()
.forPath(path);
zNodeDeleted = true;
} catch (KeeperException.NoNodeException ignored) {
// concurrent delete operation. Try again.
Expand All @@ -225,8 +234,12 @@ private void deleteZNode(String path) throws Exception {
*/
private void tryDeleteEmptyParentZNodes() throws Exception {
// try to delete the parent znodes if they are empty
String remainingPath = getParentPath(getNormalizedPath(client.getNamespace()));
final CuratorFramework nonNamespaceClient = client.usingNamespace(null);
String remainingPath =
getParentPath(
getNormalizedPath(
curatorFrameworkWrapper.asCuratorFramework().getNamespace()));
final CuratorFramework nonNamespaceClient =
curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null);

while (!isRootPath(remainingPath)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
Expand Down Expand Up @@ -153,9 +154,9 @@ public static String generateLeaderLatchPath(String path) {
* @param configuration {@link Configuration} object containing the configuration values
* @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to handle unexpected
* errors of {@link CuratorFramework}
* @return {@link CuratorFramework} instance
* @return {@link CuratorFrameworkWithUnhandledErrorListener} instance
*/
public static CuratorFramework startCuratorFramework(
public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework(
Configuration configuration, FatalErrorHandler fatalErrorHandler) {
checkNotNull(configuration, "configuration");
String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
Expand Down Expand Up @@ -239,10 +240,10 @@ public static CuratorFramework startCuratorFramework(
* @param builder {@link CuratorFrameworkFactory.Builder} A builder for curatorFramework.
* @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to handle unexpected
* errors of {@link CuratorFramework}
* @return {@link CuratorFramework} instance
* @return {@link CuratorFrameworkWithUnhandledErrorListener} instance
*/
@VisibleForTesting
public static CuratorFramework startCuratorFramework(
public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework(
CuratorFrameworkFactory.Builder builder, FatalErrorHandler fatalErrorHandler) {
CuratorFramework cf = builder.build();
UnhandledErrorListener unhandledErrorListener =
Expand All @@ -258,7 +259,7 @@ public static CuratorFramework startCuratorFramework(
};
cf.getUnhandledErrorListenable().addListener(unhandledErrorListener);
cf.start();
return cf;
return new CuratorFrameworkWithUnhandledErrorListener(cf, unhandledErrorListener);
}

/** Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception {
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
final CuratorFramework client =
final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);

try {
Expand All @@ -66,7 +67,8 @@ public void testRecoveredAfterConnectionLoss() throws Exception {
connectionLossLatch, reconnectedLatch);

ZooKeeperCheckpointIDCounter idCounter =
new ZooKeeperCheckpointIDCounter(client, listener);
new ZooKeeperCheckpointIDCounter(
curatorFrameworkWrapper.asCuratorFramework(), listener);
idCounter.start();

AtomicLong localCounter = new AtomicLong(1L);
Expand All @@ -86,7 +88,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception {
idCounter.getAndIncrement(),
is(localCounter.getAndIncrement()));
} finally {
client.close();
curatorFrameworkWrapper.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.state.RetrievableStateHandle;
Expand Down Expand Up @@ -81,7 +82,8 @@ public void testRecoverFailsIfDownloadFails() {
final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper =
new ZooKeeperStateHandleStore<CompletedCheckpoint>(
ZooKeeperUtils.startCuratorFramework(
configuration, NoOpFatalErrorHandler.INSTANCE),
configuration, NoOpFatalErrorHandler.INSTANCE)
.asCuratorFramework(),
new TestingRetrievableStateStorageHelper<>()) {
@Override
public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
Expand Down Expand Up @@ -120,9 +122,10 @@ public void testDiscardingSubsumedCheckpoints() throws Exception {
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

final CuratorFramework client =
final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
final CompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
final CompletedCheckpointStore checkpointStore =
createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());

try {
final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 =
Expand All @@ -141,7 +144,7 @@ public void testDiscardingSubsumedCheckpoints() throws Exception {
// verify that the subsumed checkpoint is discarded
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
} finally {
client.close();
curatorFrameworkWrapper.close();
}
}

Expand All @@ -156,9 +159,10 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception {
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

final CuratorFramework client =
final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
final CompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client);
final CompletedCheckpointStore checkpointStore =
createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());

try {
final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 =
Expand All @@ -172,7 +176,7 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception {
// verify that the checkpoint is discarded
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
} finally {
client.close();
curatorFrameworkWrapper.close();
}
}

Expand Down Expand Up @@ -227,9 +231,11 @@ public void testAddCheckpointWithFailedRemove() throws Exception {
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

final CuratorFramework client =
final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client);

final CompletedCheckpointStore store =
createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());

CountDownLatch discardAttempted = new CountDownLatch(1);
for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception {
new TestingLeaderElectionService();

final CuratorFramework client =
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler);
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler)
.asCuratorFramework();
try (final TestingHighAvailabilityServices highAvailabilityServices =
new TestingHighAvailabilityServicesBuilder()
.setRunningJobsRegistry(
Expand Down
Loading

0 comments on commit 43ff973

Please sign in to comment.