-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-28150][sql-gateway][hive] Introduce the hiveserver2 endpoint and factory #20101
Conversation
37c3c38
to
5dda594
Compare
40d67f3
to
4b5c49f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fsk119 The codes generally look good to me. I left some minor comments, PTAL. Thanks!
...k-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
Outdated
Show resolved
Hide resolved
...k-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
Outdated
Show resolved
Hide resolved
// Server Options | ||
// -------------------------------------------------------------------------------------------- | ||
|
||
public static final ConfigOption<Integer> THRIFT_PORT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these options are scoped to Hive endpoint, should we add a prefix like hive-endpoint.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the config option name should contain a prefix? Actually, we already finished this in the SqlGatewayEndpointFactoryUtils#createSqlGatewayEndpoint
. The util will automatically search the options that begin with the sql-gateway.endpoint.<identifier>
, e.g. sql-gateway.endpoint.hiveserver2
here. You can take a look at the test in the SqlGatewayEndpointFactoryUtilsTest#testCreateEndpoints
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM now.
...gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
Outdated
Show resolved
Hide resolved
...gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java
Show resolved
Hide resolved
public void close() { | ||
operationManager.close(); | ||
for (String name : sessionState.catalogManager.listCatalogs()) { | ||
sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be a CatalogException when closing a catalog. I think it'd be better to wrap it with a try-catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It might get a resource leak if one of the catalogs fails to close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contribution. I left some comments.
? Collections.emptyMap() | ||
: tOpenSessionReq.getConfiguration(); | ||
Map<String, String> sessionConfig = new HashMap<>(); | ||
sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive dialect is the default dialect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We want the HiveSever2 endpoint is compatible with HiveServer2. With the HiveServer2 endpoint, users can use hive dialect directly.
...or-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
Show resolved
Hide resolved
...or-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
Outdated
Show resolved
Hide resolved
TProtocolVersion clientProtocol = tOpenSessionReq.getClient_protocol(); | ||
HiveServer2EndpointVersion sessionVersion = | ||
HiveServer2EndpointVersion.valueOf( | ||
TProtocolVersion.findByValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TProtocolVersion.findByValue
may return null, then HiveServer2EndpointVersion.valueOf(null) will throw the exception will be the message Unknown TProtocolVersion: null
, which may not user friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's impossible because we use min(server_version, ...)
here. The negotiation results should be less than the Server Version.
|
||
HIVE_CLI_SERVICE_PROTOCOL_V9(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9), | ||
|
||
HIVE_CLI_SERVICE_PROTOCOL_V10(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version for Hive3 will be up to TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
. But since miss it seems won't cause any problem, it's just a reminder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out! After offline discussion, I think the only way to support HiveSever2 in Hive 3.x is to expose getHiveServer2Endpoint
and getHiveServer2EndpointVersion
in HiveShim
. This is because HiveServer2Endpoint implements TCLIService.Iface
and the interface is different in the different hive version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering the HiveServer2EndpointVersion
and HiveServer2Endpoint
is not annotated with @PublicEvolving
, I think we can expose this in the future.
} | ||
|
||
@Test | ||
public void testOpenSessionWithConfig() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it for open session with config? Seems we haven't set configuration in this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I add more configs here.
...ector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
Outdated
Show resolved
Hide resolved
7622bbe
to
e866c29
Compare
62480f7
to
104a4d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@flinkbot run azure |
104a4d7
to
d889c33
Compare
…nd factory This closes apache#20101
What is the purpose of the change
Introduce the HiveServer2 Endpoint and its Factory. With the HiveServer2 Endpoint, users is avaliable to connect to the SqlGateway with Hive JDBC.
Brief change log
thrift.max.message.size
,thrift.login.timeout
,thrift.exponential.backoff.slot.length
. The newly introduced options are align with the hive, please refer to the HiveConf. The PR also introduce the optionmodule.name
that aligns with the optioncatalog.name
as the default hive module name.OpenSession
andCloseSession
in HiveServer2Endpoint.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation