From 6e300321d7b8acbe9da9b423281524b550d22e3c Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Tue, 17 Nov 2020 10:03:52 +0800 Subject: [PATCH] [FLINK-19863][tests][hbase] Harden HBase end-to-end tests This commit checks the HBase processors and data dir have been cleaned up after shutting down a HBase cluster. And also checks the required resources, e.g. Zookeeper, HBase meta has been available when starting up a new HBase cluster. This closes #14032 --- .../hbase/LocalStandaloneHBaseResource.java | 103 ++++++++++++------ 1 file changed, 71 insertions(+), 32 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java index 2478432d222aa..d8c03d72cf5e9 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -46,6 +47,8 @@ public class LocalStandaloneHBaseResource implements HBaseResource { private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class); + private static final int MAX_RETRIES = 3; + private static final int RETRY_INTERVAL_SECONDS = 30; private final TemporaryFolder tmp = new TemporaryFolder(); private final DownloadCache downloadCache = DownloadCache.get(); @@ -68,7 +71,7 @@ public void before() throws Exception { tmp.create(); downloadCache.before(); - this.hbaseDir = tmp.newFolder("hbase").toPath().toAbsolutePath(); + this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath(); setupHBaseDist(); setupHBaseCluster(); } @@ -92,62 +95,98 @@ private void setupHBaseDist() throws IOException { } private void setupHBaseCluster() throws IOException { - LOG.info("Starting HBase cluster"); - AutoClosableProcess.runBlocking( - hbaseDir.resolve(Paths.get("bin", "start-hbase.sh")).toString()); - - while (!isHBaseRunning()) { - try { - LOG.info("Waiting for HBase to start"); - Thread.sleep(500L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } + LOG.info("Starting HBase cluster..."); + runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning()); + LOG.info("Start HBase cluster success"); } @Override public void afterTestSuccess() { + shutdownResource(); + downloadCache.afterTestSuccess(); + tmp.delete(); + } + + private void shutdownResource() { + LOG.info("Stopping HBase Cluster..."); try { - LOG.info("Stopping HBase Cluster"); - AutoClosableProcess.runBlocking( - hbaseDir.resolve(Paths.get("bin", "hbase-daemon.sh")).toString(), - "stop", - "master"); + runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive()); + } catch (IOException ioe) { + LOG.warn("Error when shutting down HBase Cluster.", ioe); + } + LOG.info("Stop HBase Cluster success"); + } + + private void runHBaseProcessWithRetry(String command, Supplier processStatusChecker) throws IOException { + LOG.info("Execute {} for HBase Cluster", command); - while (isHBaseRunning()) { + for (int i = 1; i <= MAX_RETRIES; i++) { + try { + AutoClosableProcess.runBlocking( + hbaseDir.resolve(Paths.get("bin", command)).toString()); + } catch (IOException ioe) { + LOG.warn("Get exception when execute {} ", command, ioe); + } + + int waitSecond = 0; + while (processStatusChecker.get()) { try { - LOG.info("Waiting for HBase to stop"); - Thread.sleep(500L); + LOG.info("Waiting for HBase {} works", command); + Thread.sleep(1000L); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + LOG.warn("sleep interrupted", e); + } + waitSecond++; + if (waitSecond > RETRY_INTERVAL_SECONDS) { break; } } + if (waitSecond < RETRY_INTERVAL_SECONDS) { + break; + } else { + if (i == MAX_RETRIES) { + LOG.error("Execute {} failed, retry times {}", command, i); + throw new IllegalArgumentException(String.format( + "Execute %s failed aftert retry %s times", command, i)); + } else { + LOG.warn("Execute {} failed, retry times {}", command, i); + } + } + } + } + + private boolean isHMasterRunning() { + try { + final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false); + queryHBaseStatus(line -> + atomicHMasterStarted.compareAndSet(false, line.contains("hbase:namespace"))); + return atomicHMasterStarted.get(); } catch (IOException ioe) { - LOG.warn("Error while shutting down hbase.", ioe); + return false; } - downloadCache.afterTestSuccess(); - tmp.delete(); } - private static boolean isHBaseRunning() { + private void queryHBaseStatus(final Consumer stdoutProcessor) throws IOException { + executeHBaseShell("scan 'hbase:meta'", stdoutProcessor); + } + + private boolean isHMasterAlive() { try { final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false); - queryHMasterStatus(line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster"))); + queryHBaseProcess(line -> + atomicHMasterStarted.compareAndSet(false, line.contains("HMaster"))); return atomicHMasterStarted.get(); } catch (IOException ioe) { return false; } } - private static void queryHMasterStatus(final Consumer stdoutProcessor) throws IOException { + private void queryHBaseProcess(final Consumer stdoutProcessor) throws IOException { AutoClosableProcess - .create("jps") - .setStdoutProcessor(stdoutProcessor) - .runBlocking(); + .create("jps") + .setStdoutProcessor(stdoutProcessor) + .runBlocking(); } @Override