Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14906][table] create and drop temp system functions from DDL t… #10484

Conversation

HuangZhenQiu
Copy link
Contributor

What is the purpose of the change

Support create and drop temp system functions from DDL to FunctionCatalog with function ddl.

Brief change log

  • Create and Drop temporary system functions for both stream and batch
  • Add test cases in CatalogFunctionITCase for both planner and blink planner

Verifying this change

The features is covered in CatalogFunctionITCases

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (o)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 8, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit c604c9f (Sun Dec 08 04:41:27 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 8, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

Copy link
Member

@bowenli86 bowenli86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because system functions and catalog functions are two different categories, I think 1) we need a Create/DropTempSystemFunctionOperation despite existing Create/DropTempFunctionOperation (which should be renamed to Create/DropCatalogFunctionOperation), 2) we shouldn't use CatalogFunction in the operation because of that, just use a string to store the function class should be fine.

public class CreateTempSystemFunctionOperation implements CreateOperation {
	private final String functionName;
	private String functionClass;
	private boolean ignoreIfExists;
...

public class DropTempSystemFunctionOperation implements DropOperation {
	private final String functionName;
	private final boolean ifExists;
	// private final boolean isTemporary; // no need for this, since ddl can only operate on temp system funtionc

* @param functionName the name of the function
* @return whether the temporary system function exists in the function catalog
*/
public boolean hasSystemCatalogFunction(String functionName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be hasTempSystemFunction

flink-python/pyflink/table/catalog.py Outdated Show resolved Hide resolved
flink-python/pyflink/table/tests/test_catalog.py Outdated Show resolved Hide resolved
@@ -33,16 +33,22 @@
private final String className; // Fully qualified class name of the function
private final FunctionLanguage functionLanguage;
private final boolean isTemporary;
private final boolean isSystem;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this in catalog function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@HuangZhenQiu HuangZhenQiu force-pushed the FLINK-14906-support-temprary-system-function branch from c604c9f to 3679bff Compare December 8, 2019 07:54
@HuangZhenQiu
Copy link
Contributor Author

@bowenli86 Thanks for the feedback. I agree with your suggestion. We should distinguish system or catalog function from the very beginning. Thus, different types of operations should be created for system and catalog functions.

@HuangZhenQiu HuangZhenQiu force-pushed the FLINK-14906-support-temprary-system-function branch 5 times, most recently from 1f803a0 to 3151c7b Compare December 8, 2019 18:12
@HuangZhenQiu HuangZhenQiu force-pushed the FLINK-14906-support-temprary-system-function branch from 3151c7b to 4a37049 Compare December 8, 2019 18:23
Copy link
Member

@bowenli86 bowenli86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. tested locally

I will reformat the code for the following issues when merging

@@ -843,29 +878,65 @@ private void dropCatalogFunction(DropFunctionOperation dropFunctionOperation) {
.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

functionCatalog.registerTempCatalogAggregateFunction(
functionIdentifier,
aggregateFunction,
typeInfo,
accTypeInfo);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

} else if (functionDefinition instanceof TableFunctionDefinition) {
TableFunctionDefinition tableFunctionDefinition = (TableFunctionDefinition) functionDefinition;
TableFunction<T> tableFunction = (TableFunction<T>) tableFunctionDefinition.getTableFunction();
TypeInformation<T> typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tableFunction);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

protected TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

@bowenli86 bowenli86 closed this in 5bc914e Dec 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants