From 43ff97396dd6c6d68cef5d8f57ff1685c6e276b4 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 4 Oct 2021 13:23:28 +0800 Subject: [PATCH] [FLINK-24437][HA]Remove unhandled exception handler from CuratorFramework before closing it --- .../HighAvailabilityServicesUtils.java | 8 +- ...orFrameworkWithUnhandledErrorListener.java | 53 +++++++++ .../zookeeper/ZooKeeperClientHAServices.java | 18 +-- .../zookeeper/ZooKeeperHaServices.java | 41 ++++--- .../flink/runtime/util/ZooKeeperUtils.java | 11 +- ...ZKCheckpointIDCounterMultiServersTest.java | 8 +- ...ZooKeeperCompletedCheckpointStoreTest.java | 24 ++-- .../ZooKeeperDefaultDispatcherRunnerTest.java | 3 +- .../zookeeper/ZooKeeperRegistryTest.java | 11 +- .../ZooKeeperJobGraphStoreWatcherTest.java | 4 +- .../leaderelection/LeaderElectionTest.java | 17 +-- ...rLeaderElectionConnectionHandlingTest.java | 6 +- .../ZooKeeperLeaderElectionTest.java | 110 +++++++++++------- ...LeaderRetrievalConnectionHandlingTest.java | 12 +- .../ZooKeeperLeaderRetrievalTest.java | 12 +- .../util/ZooKeeperUtilsTreeCacheTest.java | 11 +- .../ZooKeeperStateHandleStoreTest.java | 7 +- .../zookeeper/ZooKeeperTestEnvironment.java | 19 ++- 18 files changed, 245 insertions(+), 130 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/CuratorFrameworkWithUnhandledErrorListener.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index a9120f79edb82..d2bcb65d6eb26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -45,8 +45,6 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; - import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.Executor; @@ -147,9 +145,9 @@ public static ClientHighAvailabilityServices createClientHAService( configuration, AddressResolution.TRY_ADDRESS_RESOLUTION); return new StandaloneClientHAServices(webMonitorAddress); case ZOOKEEPER: - final CuratorFramework client = - ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler); - return new ZooKeeperClientHAServices(client, configuration); + return new ZooKeeperClientHAServices( + ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler), + configuration); case FACTORY_CLASS: return createCustomClientHAServices(configuration); default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/CuratorFrameworkWithUnhandledErrorListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/CuratorFrameworkWithUnhandledErrorListener.java new file mode 100644 index 0000000000000..29ad29cb83f10 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/CuratorFrameworkWithUnhandledErrorListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.zookeeper; + +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; + +import java.io.Closeable; + +/** + * A wrapper for curatorFramework and unHandledErrorListener which should be unregister from + * curatorFramework before closing it. + */ +public class CuratorFrameworkWithUnhandledErrorListener implements Closeable { + + private final CuratorFramework client; + + private final UnhandledErrorListener listener; + + public CuratorFrameworkWithUnhandledErrorListener( + CuratorFramework client, UnhandledErrorListener listener) { + this.client = Preconditions.checkNotNull(client); + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + public void close() { + client.getUnhandledErrorListenable().removeListener(listener); + client.close(); + } + + public CuratorFramework asCuratorFramework() { + return client; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java index 525268ead9d8f..7ec4622e6de9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java @@ -22,32 +22,34 @@ import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; - -import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; /** ZooKeeper based implementation for {@link ClientHighAvailabilityServices}. */ public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices { - private final CuratorFramework client; + private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final Configuration configuration; public ZooKeeperClientHAServices( - @Nonnull CuratorFramework client, @Nonnull Configuration configuration) { - this.client = client; - this.configuration = configuration; + @Nonnull CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper, + @Nonnull Configuration configuration) { + this.curatorFrameworkWrapper = Preconditions.checkNotNull(curatorFrameworkWrapper); + this.configuration = Preconditions.checkNotNull(configuration); } @Override public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService( - client, ZooKeeperUtils.getLeaderPathForRestServer(), configuration); + curatorFrameworkWrapper.asCuratorFramework(), + ZooKeeperUtils.getLeaderPathForRestServer(), + configuration); } @Override public void close() throws Exception { - client.close(); + curatorFrameworkWrapper.close(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index c9811da383de8..03c89f5d01321 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -100,49 +100,54 @@ public class ZooKeeperHaServices extends AbstractHaServices { // ------------------------------------------------------------------------ - /** The ZooKeeper client to use. */ - private final CuratorFramework client; + /** The curator resource to use. */ + private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; public ZooKeeperHaServices( - CuratorFramework client, + CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper, Executor executor, Configuration configuration, BlobStoreService blobStoreService) { super(configuration, executor, blobStoreService); - this.client = checkNotNull(client); + this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper); } @Override public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception { return new ZooKeeperCheckpointRecoveryFactory( - ZooKeeperUtils.useNamespaceAndEnsurePath(client, ZooKeeperUtils.getJobsPath()), + ZooKeeperUtils.useNamespaceAndEnsurePath( + curatorFrameworkWrapper.asCuratorFramework(), ZooKeeperUtils.getJobsPath()), configuration, ioExecutor); } @Override public JobGraphStore createJobGraphStore() throws Exception { - return ZooKeeperUtils.createJobGraphs(client, configuration); + return ZooKeeperUtils.createJobGraphs( + curatorFrameworkWrapper.asCuratorFramework(), configuration); } @Override public RunningJobsRegistry createRunningJobsRegistry() { - return new ZooKeeperRunningJobsRegistry(client, configuration); + return new ZooKeeperRunningJobsRegistry( + curatorFrameworkWrapper.asCuratorFramework(), configuration); } @Override protected LeaderElectionService createLeaderElectionService(String leaderPath) { - return ZooKeeperUtils.createLeaderElectionService(client, leaderPath); + return ZooKeeperUtils.createLeaderElectionService( + curatorFrameworkWrapper.asCuratorFramework(), leaderPath); } @Override protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) { - return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath, configuration); + return ZooKeeperUtils.createLeaderRetrievalService( + curatorFrameworkWrapper.asCuratorFramework(), leaderPath, configuration); } @Override public void internalClose() { - client.close(); + curatorFrameworkWrapper.close(); } @Override @@ -198,13 +203,17 @@ private void deleteZNode(String path) throws Exception { // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. boolean zNodeDeleted = false; while (!zNodeDeleted) { - Stat stat = client.checkExists().forPath(path); + Stat stat = curatorFrameworkWrapper.asCuratorFramework().checkExists().forPath(path); if (stat == null) { logger.debug("znode {} has been deleted", path); return; } try { - client.delete().deletingChildrenIfNeeded().forPath(path); + curatorFrameworkWrapper + .asCuratorFramework() + .delete() + .deletingChildrenIfNeeded() + .forPath(path); zNodeDeleted = true; } catch (KeeperException.NoNodeException ignored) { // concurrent delete operation. Try again. @@ -225,8 +234,12 @@ private void deleteZNode(String path) throws Exception { */ private void tryDeleteEmptyParentZNodes() throws Exception { // try to delete the parent znodes if they are empty - String remainingPath = getParentPath(getNormalizedPath(client.getNamespace())); - final CuratorFramework nonNamespaceClient = client.usingNamespace(null); + String remainingPath = + getParentPath( + getNormalizedPath( + curatorFrameworkWrapper.asCuratorFramework().getNamespace())); + final CuratorFramework nonNamespaceClient = + curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null); while (!isRootPath(remainingPath)) { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index bfe4bac05c861..79d01586c416d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -153,9 +154,9 @@ public static String generateLeaderLatchPath(String path) { * @param configuration {@link Configuration} object containing the configuration values * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to handle unexpected * errors of {@link CuratorFramework} - * @return {@link CuratorFramework} instance + * @return {@link CuratorFrameworkWithUnhandledErrorListener} instance */ - public static CuratorFramework startCuratorFramework( + public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( Configuration configuration, FatalErrorHandler fatalErrorHandler) { checkNotNull(configuration, "configuration"); String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM); @@ -239,10 +240,10 @@ public static CuratorFramework startCuratorFramework( * @param builder {@link CuratorFrameworkFactory.Builder} A builder for curatorFramework. * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to handle unexpected * errors of {@link CuratorFramework} - * @return {@link CuratorFramework} instance + * @return {@link CuratorFrameworkWithUnhandledErrorListener} instance */ @VisibleForTesting - public static CuratorFramework startCuratorFramework( + public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( CuratorFrameworkFactory.Builder builder, FatalErrorHandler fatalErrorHandler) { CuratorFramework cf = builder.build(); UnhandledErrorListener unhandledErrorListener = @@ -258,7 +259,7 @@ public static CuratorFramework startCuratorFramework( }; cf.getUnhandledErrorListenable().addListener(unhandledErrorListener); cf.start(); - return cf; + return new CuratorFrameworkWithUnhandledErrorListener(cf, unhandledErrorListener); } /** Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java index 7a7f9ddcc9d4f..e474de3303d53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; @@ -54,7 +55,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception { final Configuration configuration = new Configuration(); configuration.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); - final CuratorFramework client = + final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); try { @@ -66,7 +67,8 @@ public void testRecoveredAfterConnectionLoss() throws Exception { connectionLossLatch, reconnectedLatch); ZooKeeperCheckpointIDCounter idCounter = - new ZooKeeperCheckpointIDCounter(client, listener); + new ZooKeeperCheckpointIDCounter( + curatorFrameworkWrapper.asCuratorFramework(), listener); idCounter.start(); AtomicLong localCounter = new AtomicLong(1L); @@ -86,7 +88,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception { idCounter.getAndIncrement(), is(localCounter.getAndIncrement())); } finally { - client.close(); + curatorFrameworkWrapper.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 9c71fcec6b450..273d8bf119749 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -81,7 +82,8 @@ public void testRecoverFailsIfDownloadFails() { final ZooKeeperStateHandleStore checkpointsInZooKeeper = new ZooKeeperStateHandleStore( ZooKeeperUtils.startCuratorFramework( - configuration, NoOpFatalErrorHandler.INSTANCE), + configuration, NoOpFatalErrorHandler.INSTANCE) + .asCuratorFramework(), new TestingRetrievableStateStorageHelper<>()) { @Override public List, String>> @@ -120,9 +122,10 @@ public void testDiscardingSubsumedCheckpoints() throws Exception { configuration.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); - final CuratorFramework client = + final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); - final CompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client); + final CompletedCheckpointStore checkpointStore = + createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework()); try { final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = @@ -141,7 +144,7 @@ public void testDiscardingSubsumedCheckpoints() throws Exception { // verify that the subsumed checkpoint is discarded CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1); } finally { - client.close(); + curatorFrameworkWrapper.close(); } } @@ -156,9 +159,10 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception { configuration.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); - final CuratorFramework client = + final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); - final CompletedCheckpointStore checkpointStore = createZooKeeperCheckpointStore(client); + final CompletedCheckpointStore checkpointStore = + createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework()); try { final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = @@ -172,7 +176,7 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception { // verify that the checkpoint is discarded CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1); } finally { - client.close(); + curatorFrameworkWrapper.close(); } } @@ -227,9 +231,11 @@ public void testAddCheckpointWithFailedRemove() throws Exception { configuration.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); - final CuratorFramework client = + final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); - final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client); + + final CompletedCheckpointStore store = + createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework()); CountDownLatch discardAttempted = new CountDownLatch(1); for (long i = 0; i < numCheckpointsToRetain + 1; ++i) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index e64f7c4dc6376..88bd8994878b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -143,7 +143,8 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception { new TestingLeaderElectionService(); final CuratorFramework client = - ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler); + ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler) + .asCuratorFramework(); try (final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder() .setRunningJobsRegistry( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java index 4f61991954cdf..7849274500a52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java @@ -30,8 +30,6 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; -import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; - import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; @@ -65,12 +63,13 @@ public void testZooKeeperRegistry() throws Exception { HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - final CuratorFramework zkClient = - ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); - final HighAvailabilityServices zkHaService = new ZooKeeperHaServices( - zkClient, Executors.directExecutor(), configuration, new VoidBlobStore()); + ZooKeeperUtils.startCuratorFramework( + configuration, NoOpFatalErrorHandler.INSTANCE), + Executors.directExecutor(), + configuration, + new VoidBlobStore()); final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java index 3667d24bd9dff..e8a2f9f7848f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; @@ -71,9 +72,10 @@ public void setup() throws Exception { @Test public void testJobGraphAddedAndRemovedShouldNotifyGraphStoreListener() throws Exception { - try (final CuratorFramework client = + try (final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, NoOpFatalErrorHandler.INSTANCE)) { + final CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework(); final JobGraphStoreWatcher jobGraphStoreWatcher = createAndStartJobGraphStoreWatcher(client); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index a0b9012965c91..33e856772e7ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -21,14 +21,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.testutils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; - import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; @@ -172,7 +171,7 @@ private static final class ZooKeeperServiceClass implements ServiceClass { private TestingServer testingServer; - private CuratorFramework client; + private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private Configuration configuration; @@ -190,14 +189,15 @@ public void setup(FatalErrorHandler fatalErrorHandler) throws Exception { HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - client = ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler); + curatorFrameworkWrapper = + ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler); } @Override public void teardown() throws Exception { - if (client != null) { - client.close(); - client = null; + if (curatorFrameworkWrapper != null) { + curatorFrameworkWrapper.close(); + curatorFrameworkWrapper = null; } if (testingServer != null) { @@ -208,7 +208,8 @@ public void teardown() throws Exception { @Override public LeaderElectionService createLeaderElectionService() throws Exception { - return ZooKeeperUtils.createLeaderElectionService(client); + return ZooKeeperUtils.createLeaderElectionService( + curatorFrameworkWrapper.asCuratorFramework()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java index a0f2aa0d51e25..1f49f2c3b52e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; @@ -129,9 +130,10 @@ private void runTestWithZooKeeperConnectionProblem( validationLogic, Problem problem) throws Exception { - CuratorFramework client = + CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, fatalErrorHandlerResource.getFatalErrorHandler()); + CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework(); LeaderElectionDriverFactory leaderElectionDriverFactory = new ZooKeeperLeaderElectionDriverFactory(client, PATH); DefaultLeaderElectionService leaderElectionService = @@ -162,7 +164,7 @@ private void runTestWithZooKeeperConnectionProblem( validationLogic.accept(connectionStateListener, contender); } finally { leaderElectionService.stop(); - client.close(); + curatorFrameworkWrapper.close(); if (problem == Problem.LOST_CONNECTION) { // in case of lost connections we accept that some unhandled error can occur diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index b7b096eb8dd3b..8ac2d66462bf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver; import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler; @@ -99,7 +100,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { private Configuration configuration; - private CuratorFramework client; + private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private static final String TEST_URL = "akka//user/jobmanager"; private static final LeaderInformation TEST_LEADER = @@ -126,16 +127,16 @@ public void before() { HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - client = + curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, testingFatalErrorHandlerResource.getFatalErrorHandler()); } @After public void after() throws IOException { - if (client != null) { - client.close(); - client = null; + if (curatorFrameworkWrapper != null) { + curatorFrameworkWrapper.close(); + curatorFrameworkWrapper = null; } if (testingServer != null) { @@ -156,9 +157,12 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception { LeaderRetrievalDriver leaderRetrievalDriver = null; try { - leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); + leaderElectionDriver = + createAndInitLeaderElectionDriver( + curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) + ZooKeeperUtils.createLeaderRetrievalDriverFactory( + curatorFrameworkWrapper.asCuratorFramework()) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -201,14 +205,18 @@ public void testZooKeeperReelection() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); + leaderRetrievalService = + ZooKeeperUtils.createLeaderRetrievalService( + curatorFrameworkWrapper.asCuratorFramework()); LOG.debug("Start leader retrieval service for the TestingListener."); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); + leaderElectionService[i] = + ZooKeeperUtils.createLeaderElectionService( + curatorFrameworkWrapper.asCuratorFramework()); contenders[i] = new TestingContender(createAddress(i), leaderElectionService[i]); LOG.debug("Start leader election service for contender #{}.", i); @@ -289,12 +297,16 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); + leaderRetrievalService = + ZooKeeperUtils.createLeaderRetrievalService( + curatorFrameworkWrapper.asCuratorFramework()); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); + leaderElectionService[i] = + ZooKeeperUtils.createLeaderElectionService( + curatorFrameworkWrapper.asCuratorFramework()); contenders[i] = new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]); @@ -322,7 +334,8 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { leaderElectionService[index].stop(); // create new leader election service which takes part in the leader election leaderElectionService[index] = - ZooKeeperUtils.createLeaderElectionService(client); + ZooKeeperUtils.createLeaderElectionService( + curatorFrameworkWrapper.asCuratorFramework()); contenders[index] = new TestingContender( TEST_URL + "_" + index + "_" + (lastTry + 1), @@ -364,16 +377,18 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { ZooKeeperLeaderElectionDriver leaderElectionDriver = null; LeaderRetrievalDriver leaderRetrievalDriver = null; - CuratorFramework anotherClient = null; + CuratorFrameworkWithUnhandledErrorListener anotherCuratorFrameworkWrapper = null; try { - leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); + leaderElectionDriver = + createAndInitLeaderElectionDriver( + curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler); electionEventHandler.waitForLeader(timeout); assertThat(electionEventHandler.getConfirmedLeaderInformation(), is(TEST_LEADER)); - anotherClient = + anotherCuratorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, NoOpFatalErrorHandler.INSTANCE); @@ -392,10 +407,16 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { leaderElectionDriver.getConnectionInformationPath(); while (!dataWritten) { - anotherClient.delete().forPath(connectionInformationPath); + anotherCuratorFrameworkWrapper + .asCuratorFramework() + .delete() + .forPath(connectionInformationPath); try { - anotherClient.create().forPath(connectionInformationPath, baos.toByteArray()); + anotherCuratorFrameworkWrapper + .asCuratorFramework() + .create() + .forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException e) { @@ -405,7 +426,8 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { // The faulty leader should be corrected on ZooKeeper leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) + ZooKeeperUtils.createLeaderRetrievalDriverFactory( + curatorFrameworkWrapper.asCuratorFramework()) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -425,8 +447,8 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { if (leaderRetrievalDriver != null) { leaderRetrievalDriver.close(); } - if (anotherClient != null) { - anotherClient.close(); + if (anotherCuratorFrameworkWrapper != null) { + anotherCuratorFrameworkWrapper.close(); } } } @@ -446,12 +468,11 @@ public void testExceptionForwarding() throws Exception { mock(CreateBuilder.class, Mockito.RETURNS_DEEP_STUBS); final String exMsg = "Test exception"; final Exception testException = new Exception(exMsg); + final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = + ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE); try { - client = - spy( - ZooKeeperUtils.startCuratorFramework( - configuration, NoOpFatalErrorHandler.INSTANCE)); + client = spy(curatorFrameworkWrapper.asCuratorFramework()); doAnswer(invocation -> mockCreateBuilder).when(client).create(); @@ -476,8 +497,8 @@ public void testExceptionForwarding() throws Exception { leaderElectionDriver.close(); } - if (client != null) { - client.close(); + if (curatorFrameworkWrapper != null) { + curatorFrameworkWrapper.close(); } } } @@ -496,25 +517,31 @@ public void testEphemeralZooKeeperNodes() throws Exception { final TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler(); - CuratorFramework client = null; - CuratorFramework client2 = null; + CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = null; + CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper2 = null; NodeCache cache = null; try { - client = + curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, testingFatalErrorHandlerResource.getFatalErrorHandler()); - client2 = + curatorFrameworkWrapper2 = ZooKeeperUtils.startCuratorFramework( configuration, testingFatalErrorHandlerResource.getFatalErrorHandler()); - leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); + leaderElectionDriver = + createAndInitLeaderElectionDriver( + curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2) + ZooKeeperUtils.createLeaderRetrievalDriverFactory( + curatorFrameworkWrapper2.asCuratorFramework()) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); - cache = new NodeCache(client2, leaderElectionDriver.getConnectionInformationPath()); + cache = + new NodeCache( + curatorFrameworkWrapper2.asCuratorFramework(), + leaderElectionDriver.getConnectionInformationPath()); ExistsCacheListener existsListener = new ExistsCacheListener(cache); DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache); @@ -535,7 +562,7 @@ public void testEphemeralZooKeeperNodes() throws Exception { leaderElectionDriver.close(); // now stop the underlying client - client.close(); + curatorFrameworkWrapper.close(); Future deletedFuture = deletedCacheListener.nodeDeleted(); @@ -561,8 +588,8 @@ public void testEphemeralZooKeeperNodes() throws Exception { cache.close(); } - if (client2 != null) { - client2.close(); + if (curatorFrameworkWrapper2 != null) { + curatorFrameworkWrapper2.close(); } } } @@ -578,7 +605,9 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { - leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); + leaderElectionDriver = + createAndInitLeaderElectionDriver( + curatorFrameworkWrapper.asCuratorFramework(), electionEventHandler); electionEventHandler.waitForLeader(timeout); assertThat(electionEventHandler.getConfirmedLeaderInformation(), is(TEST_LEADER)); @@ -591,7 +620,8 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception is(LeaderInformation.empty())); // The data on ZooKeeper it not be cleared leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) + ZooKeeperUtils.createLeaderRetrievalDriverFactory( + curatorFrameworkWrapper.asCuratorFramework()) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -644,9 +674,9 @@ public List getAclForPath(String s) { }) .namespace("flink"); - try (CuratorFramework clientWithErrorHandler = + try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(curatorFrameworkBuilder, fatalErrorHandler)) { - + CuratorFramework clientWithErrorHandler = curatorFrameworkWrapper.asCuratorFramework(); assertFalse(fatalErrorHandler.getErrorFuture().isDone()); leaderElectionDriver = createAndInitLeaderElectionDriver(clientWithErrorHandler, electionEventHandler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java index cb2ce3121a61a..2ff9053159d8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalConnectionHandlingTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; @@ -61,6 +62,8 @@ public class ZooKeeperLeaderRetrievalConnectionHandlingTest extends TestLogger { private TestingServer testingServer; + private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; + private CuratorFramework zooKeeperClient; @Rule @@ -75,9 +78,10 @@ public void before() throws Exception { config.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - zooKeeperClient = + curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( config, fatalErrorHandlerResource.getFatalErrorHandler()); + zooKeeperClient = curatorFrameworkWrapper.asCuratorFramework(); zooKeeperClient.blockUntilConnected(); } @@ -85,9 +89,9 @@ public void before() throws Exception { public void after() throws Exception { closeTestServer(); - if (zooKeeperClient != null) { - zooKeeperClient.close(); - zooKeeperClient = null; + if (curatorFrameworkWrapper != null) { + curatorFrameworkWrapper.close(); + curatorFrameworkWrapper = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java index 48ebe3d2c0f4b..17509cd274c06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java @@ -34,8 +34,6 @@ import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; - import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; @@ -77,13 +75,13 @@ public void before() throws Exception { config.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - CuratorFramework client = - ZooKeeperUtils.startCuratorFramework( - config, testingFatalErrorHandlerResource.getFatalErrorHandler()); - highAvailabilityServices = new ZooKeeperHaServices( - client, TestingUtils.defaultExecutor(), config, new VoidBlobStore()); + ZooKeeperUtils.startCuratorFramework( + config, testingFatalErrorHandlerResource.getFatalErrorHandler()), + TestingUtils.defaultExecutor(), + config, + new VoidBlobStore()); } @After diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java index d964520c95ea4..73a8d9bbc21c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; import org.apache.flink.util.TestLogger; @@ -52,6 +53,7 @@ public class ZooKeeperUtilsTreeCacheTest extends TestLogger { private Closer closer; private CuratorFramework client; + private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final AtomicReference> callbackFutureReference = new AtomicReference<>(); @@ -65,14 +67,13 @@ public void setUp() throws Exception { configuration.set( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - client = - closer.register( - ZooKeeperUtils.startCuratorFramework( - configuration, NoOpFatalErrorHandler.INSTANCE)); - client = + curatorFrameworkWrapper = closer.register( ZooKeeperUtils.startCuratorFramework( configuration, NoOpFatalErrorHandler.INSTANCE)); + + client = curatorFrameworkWrapper.asCuratorFramework(); + final TreeCache cache = closer.register( ZooKeeperUtils.createTreeCache( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index 32159f4005fd1..cea73ae671da9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.persistence.IntegerResourceVersion; import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; @@ -783,13 +784,15 @@ public void testLockCleanupWhenClientTimesOut() throws Exception { configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100); configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout"); - try (CuratorFramework client = + try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework( configuration, NoOpFatalErrorHandler.INSTANCE); - CuratorFramework client2 = + CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper2 = ZooKeeperUtils.startCuratorFramework( configuration, NoOpFatalErrorHandler.INSTANCE)) { + CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework(); + CuratorFramework client2 = curatorFrameworkWrapper2.asCuratorFramework(); ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(client, longStateStorage); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java index 8ba50272a5984..c23feda6ad5ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.util.ExitJVMFatalErrorHandler; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -43,6 +44,8 @@ public class ZooKeeperTestEnvironment { private final CuratorFramework client; + private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; + /** * Starts a ZooKeeper cluster with the number of quorum peers and a client. * @@ -75,7 +78,10 @@ public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) { zooKeeperCluster.getConnectString()); } - client = ZooKeeperUtils.startCuratorFramework(conf, ExitJVMFatalErrorHandler.INSTANCE); + curatorFrameworkWrapper = + ZooKeeperUtils.startCuratorFramework(conf, ExitJVMFatalErrorHandler.INSTANCE); + + client = curatorFrameworkWrapper.asCuratorFramework(); client.newNamespaceAwareEnsurePath("/").ensure(client.getZookeeperClient()); } catch (Exception e) { @@ -85,8 +91,8 @@ public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) { /** Shutdown the client and ZooKeeper server/cluster. */ public void shutdown() throws Exception { - if (client != null) { - client.close(); + if (curatorFrameworkWrapper != null) { + curatorFrameworkWrapper.close(); } if (zooKeeperServer != null) { @@ -124,13 +130,6 @@ public List getChildren(String path) throws Exception { return client.getChildren().forPath(path); } - /** Creates a new client for the started ZooKeeper server/cluster. */ - public CuratorFramework createClient() { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, getConnectString()); - return ZooKeeperUtils.startCuratorFramework(config, ExitJVMFatalErrorHandler.INSTANCE); - } - /** * Deletes all ZNodes under the root node. *