Skip to content

Commit

Permalink
[FLINK-19863][tests][hbase] Harden HBase end-to-end tests
Browse files Browse the repository at this point in the history
This commit checks the HBase processors and data dir have been cleaned up after shutting down a HBase cluster. And also checks the required resources, e.g. Zookeeper, HBase meta has been available when starting up a new HBase cluster.

This closes apache#14032
  • Loading branch information
leonardBang committed Nov 17, 2020
1 parent 4cc0e72 commit 6e30032
Showing 1 changed file with 71 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -46,6 +47,8 @@ public class LocalStandaloneHBaseResource implements HBaseResource {

private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);

private static final int MAX_RETRIES = 3;
private static final int RETRY_INTERVAL_SECONDS = 30;
private final TemporaryFolder tmp = new TemporaryFolder();

private final DownloadCache downloadCache = DownloadCache.get();
Expand All @@ -68,7 +71,7 @@ public void before() throws Exception {
tmp.create();
downloadCache.before();

this.hbaseDir = tmp.newFolder("hbase").toPath().toAbsolutePath();
this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
setupHBaseDist();
setupHBaseCluster();
}
Expand All @@ -92,62 +95,98 @@ private void setupHBaseDist() throws IOException {
}

private void setupHBaseCluster() throws IOException {
LOG.info("Starting HBase cluster");
AutoClosableProcess.runBlocking(
hbaseDir.resolve(Paths.get("bin", "start-hbase.sh")).toString());

while (!isHBaseRunning()) {
try {
LOG.info("Waiting for HBase to start");
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
LOG.info("Starting HBase cluster...");
runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
LOG.info("Start HBase cluster success");
}

@Override
public void afterTestSuccess() {
shutdownResource();
downloadCache.afterTestSuccess();
tmp.delete();
}

private void shutdownResource() {
LOG.info("Stopping HBase Cluster...");
try {
LOG.info("Stopping HBase Cluster");
AutoClosableProcess.runBlocking(
hbaseDir.resolve(Paths.get("bin", "hbase-daemon.sh")).toString(),
"stop",
"master");
runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
} catch (IOException ioe) {
LOG.warn("Error when shutting down HBase Cluster.", ioe);
}
LOG.info("Stop HBase Cluster success");
}

private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker) throws IOException {
LOG.info("Execute {} for HBase Cluster", command);

while (isHBaseRunning()) {
for (int i = 1; i <= MAX_RETRIES; i++) {
try {
AutoClosableProcess.runBlocking(
hbaseDir.resolve(Paths.get("bin", command)).toString());
} catch (IOException ioe) {
LOG.warn("Get exception when execute {} ", command, ioe);
}

int waitSecond = 0;
while (processStatusChecker.get()) {
try {
LOG.info("Waiting for HBase to stop");
Thread.sleep(500L);
LOG.info("Waiting for HBase {} works", command);
Thread.sleep(1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("sleep interrupted", e);
}
waitSecond++;
if (waitSecond > RETRY_INTERVAL_SECONDS) {
break;
}
}

if (waitSecond < RETRY_INTERVAL_SECONDS) {
break;
} else {
if (i == MAX_RETRIES) {
LOG.error("Execute {} failed, retry times {}", command, i);
throw new IllegalArgumentException(String.format(
"Execute %s failed aftert retry %s times", command, i));
} else {
LOG.warn("Execute {} failed, retry times {}", command, i);
}
}
}
}

private boolean isHMasterRunning() {
try {
final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
queryHBaseStatus(line ->
atomicHMasterStarted.compareAndSet(false, line.contains("hbase:namespace")));
return atomicHMasterStarted.get();
} catch (IOException ioe) {
LOG.warn("Error while shutting down hbase.", ioe);
return false;
}
downloadCache.afterTestSuccess();
tmp.delete();
}

private static boolean isHBaseRunning() {
private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
}

private boolean isHMasterAlive() {
try {
final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
queryHMasterStatus(line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
queryHBaseProcess(line ->
atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
return atomicHMasterStarted.get();
} catch (IOException ioe) {
return false;
}
}

private static void queryHMasterStatus(final Consumer<String> stdoutProcessor) throws IOException {
private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
AutoClosableProcess
.create("jps")
.setStdoutProcessor(stdoutProcessor)
.runBlocking();
.create("jps")
.setStdoutProcessor(stdoutProcessor)
.runBlocking();
}

@Override
Expand Down

0 comments on commit 6e30032

Please sign in to comment.