Skip to content

Commit

Permalink
[fix] fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 committed Jul 22, 2022
1 parent d889c33 commit 104a4d7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.util.ExceptionUtils;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.rpc.thrift.TCLIService;
Expand Down Expand Up @@ -97,6 +98,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -133,11 +135,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
private final int backOffSlotLengthMs;
private final long maxMessageSize;
private final int port;
private final CountDownLatch latch;

private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint");
private ThreadPoolExecutor executor;
private TThreadPoolServer server;

private volatile Throwable error;

// --------------------------------------------------------------------------------------------
// Catalog attributes
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -206,6 +211,7 @@ public HiveServer2Endpoint(
this.minWorkerThreads = minWorkerThreads;
this.maxWorkerThreads = maxWorkerThreads;
this.workerKeepAliveTime = checkNotNull(workerKeepAliveTime);
this.latch = new CountDownLatch(1);

this.catalogName = checkNotNull(catalogName);
this.hiveConfPath = hiveConfPath;
Expand All @@ -232,6 +238,14 @@ public void stop() throws Exception {
}
}

@Override
public void awaitTermination() throws Exception {
latch.await();
if (error != null) {
ExceptionUtils.rethrowException(error, error.getMessage());
}
}

@Override
public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TException {
LOG.debug("Client protocol version: {}.", tOpenSessionReq.getClient_protocol());
Expand Down Expand Up @@ -457,7 +471,9 @@ public void run() {
server.serve();
} catch (Throwable t) {
LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t);
error = t;
}
latch.countDown();
}

private void buildTThreadPoolServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface SqlGatewayEndpoint {

/** Terminate the endpoint. */
void stop() throws Exception;

default void awaitTermination() throws Exception {}
}

0 comments on commit 104a4d7

Please sign in to comment.