Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21727][hive] Support DDL in HiveParser #15151

Merged
merged 2 commits into from
Mar 16, 2021

Conversation

lirui-apache
Copy link
Contributor

What is the purpose of the change

Support DDLs in hive parser

Brief change log

  • Add HiveParserDDLSemanticAnalyzer to analyze DDLs.
  • Add DDLOperationConverter to convert the parsed DDLs into operations.
  • Handle DDLs in HiveParser. DQL and DML are still handled by super class.
  • Hive's temp function is more like flink's temp system function. Added an operation to create temp system function with a CatalogFunction instance.
  • Misc changes needed to support DDLs.

Verifying this change

Existing and added test cases.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? doc

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 11, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ec1a240 (Sat Aug 28 11:13:34 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 11, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lirui-apache for the great work. I left some comment.

DDLOperationConverter ddlConverter =
new DDLOperationConverter(getCatalogManager(), hiveShim);
if (work instanceof HiveParserCreateViewDesc) {
return super.parse(cmd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is delegated to super parser?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HiveParser cannot parse queries at the moment.

}

private Operation convertShowFunctions(ShowFunctionsDesc desc) {
return new ShowFunctionsOperation();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable like pattern for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveParserDDLSemanticAnalyzer::analyzeShowFunctions would throw an exception for "SHOW FUNCTIONS LIKE"

funcDefFactory.createFunctionDefinition(
desc.getFunctionName(),
new CatalogFunctionImpl(desc.getClassName(), FunctionLanguage.JAVA));
return new CreateTempSystemInlineFunctionOperation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to introduce a new operation CreateTempSystemInlineFunctionOperation? Can we use CreateTempSystemFunctionOperation and pass in the class name? Or can we unify them?

It's quite confusing what's the differences between CreateTempSystemInlineFunctionOperation and CreateTempSystemFunctionOperation.

@@ -308,7 +308,7 @@ public void testAlterTable() throws Exception {
// change location
String newLocation = warehouse + "/tbl1_new_location";
tableEnv.executeSql(
String.format("alter table `default`.tbl1 set location '%s'", newLocation));
String.format("alter table default.tbl1 set location '%s'", newLocation));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we support escape anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do. This is to verify users no longer have to escape default keyword in hive dialect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test with escaption to make sure it also works?

`default`.tbl1

@lirui-apache lirui-apache force-pushed the FLINK-21727 branch 2 times, most recently from 0df0c79 to e971a1d Compare March 15, 2021 13:17
@lirui-apache
Copy link
Contributor Author

Thanks @wuchong for your review. I have updated the PR to address your comments.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updating. LGTM.

@wuchong wuchong merged commit 95d6278 into apache:master Mar 16, 2021
wuchong pushed a commit that referenced this pull request Mar 16, 2021
@lirui-apache lirui-apache deleted the FLINK-21727 branch March 16, 2021 03:57
@wangqinghuan
Copy link

wangqinghuan commented Apr 2, 2021

I want to use Flink to connect Hive external table Elasticsearch.
Following code doesn't create hive external table correctly after merging this pr into my fork.

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir     = args[0];
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("CREATE EXTERNAL TABLE IF NOT EXISTS test (\n" +
                "    id      BIGINT,\n" +
                "    name    STRING)\n" +
                "STORED by 'org.elasticsearch.hadoop.hive.EsStorageHandler'\n" +
                "TBLPROPERTIES(\n" +
                "'es.resource' = 'test',\n" +
                "'es.nodes' = '172.16.1.192:9200'\n" +
                ") ");
       tableEnv.executeSql("insert into test values(1,'test')");

Executing this code, Flink create a generic hive table rather than external table for Elasticsearch. Do we support StorageHandler in Flink hive dialect? @lirui-apache

@lirui-apache
Copy link
Contributor Author

Hey @wangqinghuan , thanks for trying out this feature and reporting the issue. Hive dialect doesn't support hive's storage handler table. That's mainly because hive connector currently doesn't support read/write such tables. So even if you manage to create such tables, you can't really use them in Flink. I think we'd better throw meaningful exceptions for these use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants