From 5bc914ec423bc9b072c51ce07bf8ff8614ae24b5 Mon Sep 17 00:00:00 2001 From: hpeter Date: Sat, 7 Dec 2019 09:41:42 -0800 Subject: [PATCH] [FLINK-14906][table] create and drop temp system functions from DDL to FunctionCatalog this closes #10484. --- .../sql/parser/ddl/SqlAlterFunction.java | 4 + .../flink/sql/parser/ddl/SqlDropFunction.java | 4 + .../api/internal/TableEnvironmentImpl.java | 107 +++++++++++++++--- .../table/catalog/CatalogFunctionImpl.java | 9 +- .../flink/table/catalog/FunctionCatalog.java | 12 +- .../functions/FunctionDefinitionUtil.java | 10 +- .../ddl/AlterFunctionOperation.java | 2 +- .../ddl/CreateFunctionOperation.java | 10 +- .../CreateTempSystemFunctionOperation.java | 70 ++++++++++++ .../operations/ddl/DropFunctionOperation.java | 10 +- .../ddl/DropTempSystemFunctionOperation.java | 63 +++++++++++ .../functions/FunctionDefinitionUtilTest.java | 9 +- .../operations/SqlToOperationConverter.java | 67 +++++++---- .../FunctionTestBase.java} | 74 +++++++++--- ...unctionITCase.java => FunctionITCase.java} | 7 +- ...unctionITCase.java => FunctionITCase.java} | 22 +++- .../sqlexec/SqlToOperationConverter.java | 60 +++++++--- .../table/api/internal/TableEnvImpl.scala | 106 ++++++++++++++--- .../FunctionTestBase.java} | 97 ++++++++++++---- ...unctionITCase.java => FunctionITCase.java} | 7 +- ...unctionITCase.java => FunctionITCase.java} | 7 +- 21 files changed, 610 insertions(+), 147 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTempSystemFunctionOperation.java rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/{catalog/CatalogFunctionTestBase.java => functions/FunctionTestBase.java} (81%) rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/{CatalogFunctionITCase.java => FunctionITCase.java} (83%) rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/{CatalogFunctionITCase.java => FunctionITCase.java} (81%) rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/{catalog/CatalogFunctionTestBase.java => functions/FunctionTestBase.java} (79%) rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/{CatalogFunctionITCase.java => FunctionITCase.java} (84%) rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/{CatalogFunctionITCase.java => FunctionITCase.java} (85%) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java index 930657837b572..a896247f01b26 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java @@ -118,6 +118,10 @@ public boolean isTemporary() { return isTemporary; } + public boolean isSystemFunction() { + return isSystemFunction; + } + public boolean isIfExists() { return this.ifExists; } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropFunction.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropFunction.java index 2688f58eb293a..38b0289c34f87 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropFunction.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropFunction.java @@ -96,6 +96,10 @@ public boolean isTemporary() { return isTemporary; } + public boolean isSystemFunction() { + return isSystemFunction; + } + public boolean getIfExists() { return this.ifExists; } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index e7d33ad24c30e..af1d47cc02bc9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -84,9 +84,11 @@ import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateFunctionOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropFunctionOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; @@ -571,12 +573,20 @@ public void sqlUpdate(String stmt) { } else if (operation instanceof CreateFunctionOperation) { CreateFunctionOperation createFunctionOperation = (CreateFunctionOperation) operation; createCatalogFunction(createFunctionOperation); + } else if (operation instanceof CreateTempSystemFunctionOperation) { + CreateTempSystemFunctionOperation createtempSystemFunctionOperation = + (CreateTempSystemFunctionOperation) operation; + createSystemFunction(createtempSystemFunctionOperation); } else if (operation instanceof AlterFunctionOperation) { AlterFunctionOperation alterFunctionOperation = (AlterFunctionOperation) operation; alterCatalogFunction(alterFunctionOperation); } else if (operation instanceof DropFunctionOperation) { DropFunctionOperation dropFunctionOperation = (DropFunctionOperation) operation; dropCatalogFunction(dropFunctionOperation); + } else if (operation instanceof DropTempSystemFunctionOperation) { + DropTempSystemFunctionOperation dropTempSystemFunctionOperation = + (DropTempSystemFunctionOperation) operation; + dropSystemFunction(dropTempSystemFunctionOperation); } else if (operation instanceof UseCatalogOperation) { UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); @@ -746,11 +756,9 @@ private void createCatalogFunction(CreateFunctionOperation createFunctionOperati createFunctionOperation.getFunctionIdentifier()); if (!exist) { FunctionDefinition functionDefinition = FunctionDefinitionUtil.createFunctionDefinition( - createFunctionOperation.getFunctionName(), - function); - registerFunctionInFunctionCatalog( - createFunctionOperation.getFunctionIdentifier(), - functionDefinition); + createFunctionOperation.getFunctionName(), function.getClassName()); + registerCatalogFunctionInFunctionCatalog( + createFunctionOperation.getFunctionIdentifier(), functionDefinition); } else if (!createFunctionOperation.isIgnoreIfExists()) { throw new ValidationException( String.format("Temporary catalog function %s is already defined", @@ -802,17 +810,9 @@ private void dropCatalogFunction(DropFunctionOperation dropFunctionOperation) { String exMsg = getDDLOpExecuteErrorMsg(dropFunctionOperation.asSummaryString()); try { if (dropFunctionOperation.isTemporary()) { - boolean exist = functionCatalog.hasTemporaryCatalogFunction( - dropFunctionOperation.getFunctionIdentifier()); - if (exist) { - functionCatalog.dropTempCatalogFunction( - dropFunctionOperation.getFunctionIdentifier(), - dropFunctionOperation.isIfExists()); - } else if (!dropFunctionOperation.isIfExists()) { - throw new ValidationException( - String.format("Temporary catalog function %s is not found", - dropFunctionOperation.getFunctionIdentifier().asSerializableString())); - } + functionCatalog.dropTempCatalogFunction( + dropFunctionOperation.getFunctionIdentifier(), + dropFunctionOperation.isIfExists()); } else { Catalog catalog = getCatalogOrThrowException (dropFunctionOperation.getFunctionIdentifier().getCatalogName()); @@ -830,11 +830,47 @@ private void dropCatalogFunction(DropFunctionOperation dropFunctionOperation) { } } - private void registerFunctionInFunctionCatalog( + private void createSystemFunction(CreateTempSystemFunctionOperation operation) { + String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); + try { + boolean exist = functionCatalog.hasTemporarySystemFunction(operation.getFunctionName()); + if (!exist) { + FunctionDefinition functionDefinition = FunctionDefinitionUtil.createFunctionDefinition( + operation.getFunctionName(), + operation.getFunctionClass()); + registerSystemFunctionInFunctionCatalog(operation.getFunctionName(), functionDefinition); + + } else if (!operation.isIgnoreIfExists()) { + throw new ValidationException( + String.format("Temporary system function %s is already defined", + operation.getFunctionName())); + } + } catch (ValidationException e) { + throw e; + } catch (Exception e) { + throw new TableException(exMsg, e); + } + } + + private void dropSystemFunction(DropTempSystemFunctionOperation operation) { + String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); + try { + functionCatalog.dropTempSystemFunction( + operation.getFunctionName(), + operation.isIfExists()); + } catch (ValidationException e) { + throw e; + } catch (Exception e) { + throw new TableException(exMsg, e); + } + } + + private void registerCatalogFunctionInFunctionCatalog( ObjectIdentifier functionIdentifier, FunctionDefinition functionDefinition) { if (functionDefinition instanceof ScalarFunctionDefinition) { ScalarFunctionDefinition scalarFunction = (ScalarFunctionDefinition) functionDefinition; - functionCatalog.registerTempCatalogScalarFunction(functionIdentifier, scalarFunction.getScalarFunction()); + functionCatalog.registerTempCatalogScalarFunction( + functionIdentifier, scalarFunction.getScalarFunction()); } else if (functionDefinition instanceof AggregateFunctionDefinition) { AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition; AggregateFunction aggregateFunction = @@ -861,6 +897,41 @@ private void registerFunctionInFunctionCatalog( } } + private void registerSystemFunctionInFunctionCatalog( + String functionName, FunctionDefinition functionDefinition) { + + if (functionDefinition instanceof ScalarFunctionDefinition) { + ScalarFunctionDefinition scalarFunction = (ScalarFunctionDefinition) functionDefinition; + functionCatalog.registerTempSystemScalarFunction( + functionName, scalarFunction.getScalarFunction()); + } else if (functionDefinition instanceof AggregateFunctionDefinition) { + AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition; + AggregateFunction aggregateFunction = + (AggregateFunction) aggregateFunctionDefinition.getAggregateFunction(); + TypeInformation typeInfo = UserFunctionsTypeHelper + .getReturnTypeOfAggregateFunction(aggregateFunction); + TypeInformation accTypeInfo = UserFunctionsTypeHelper + .getAccumulatorTypeOfAggregateFunction(aggregateFunction); + functionCatalog.registerTempSystemAggregateFunction( + functionName, + aggregateFunction, + typeInfo, + accTypeInfo); + + } else if (functionDefinition instanceof TableFunctionDefinition) { + TableFunctionDefinition tableFunctionDefinition = (TableFunctionDefinition) functionDefinition; + TableFunction tableFunction = (TableFunction) tableFunctionDefinition.getTableFunction(); + TypeInformation typeInfo = UserFunctionsTypeHelper + .getReturnTypeOfTableFunction(tableFunction); + + functionCatalog.registerTempSystemTableFunction( + functionName, + tableFunction, + typeInfo); + } + + } + protected TableImpl createTable(QueryOperation tableOperation) { return TableImpl.createTable( this, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java index 4deb7ad017791..54aceadc1f186 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java @@ -38,7 +38,10 @@ public CatalogFunctionImpl(String className) { this(className, FunctionLanguage.JAVA, false); } - public CatalogFunctionImpl(String className, FunctionLanguage functionLanguage, boolean isTemporary) { + public CatalogFunctionImpl( + String className, + FunctionLanguage functionLanguage, + boolean isTemporary) { checkArgument(!StringUtils.isNullOrWhitespaceOnly(className), "className cannot be null or empty"); this.className = className; this.functionLanguage = checkNotNull(functionLanguage, "functionLanguage cannot be null"); @@ -92,8 +95,8 @@ public FunctionLanguage getFunctionLanguage() { public String toString() { return "CatalogFunctionImpl{" + "className='" + getClassName() + "', " + - "functionLanguage='" + getFunctionLanguage() + - "isGeneric='" + isGeneric() + + "functionLanguage='" + getFunctionLanguage() + "', " + + "isGeneric='" + isGeneric() + "', " + "isTemporary='" + isTemporary() + "'}"; } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index cf0e8054c04a6..6658fb2dde5ee 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -204,6 +204,15 @@ public boolean hasTemporaryCatalogFunction(ObjectIdentifier functionIdentifier) return tempCatalogFunctions.containsKey(normalizedIdentifier); } + /** + * Check whether a temporary system function is already registered. + * @param functionName the name of the function + * @return whether the temporary system function exists in the function catalog + */ + public boolean hasTemporarySystemFunction(String functionName) { + return tempSystemFunctions.containsKey(functionName); + } + /** * Drop a temporary system function. * @@ -328,7 +337,8 @@ private Optional resolvePreciseFunctionReference(ObjectId fd = catalog.getFunctionDefinitionFactory().get() .createFunctionDefinition(oi.getObjectName(), catalogFunction); } else { - fd = FunctionDefinitionUtil.createFunctionDefinition(oi.getObjectName(), catalogFunction); + fd = FunctionDefinitionUtil.createFunctionDefinition( + oi.getObjectName(), catalogFunction.getClassName()); } return Optional.of( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java index 8eacd56527941..da798b6951b46 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java @@ -18,21 +18,19 @@ package org.apache.flink.table.functions; -import org.apache.flink.table.catalog.CatalogFunction; - /** * A util to instantiate {@link FunctionDefinition} in the default way. */ public class FunctionDefinitionUtil { - public static FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) { + public static FunctionDefinition createFunctionDefinition(String name, String className) { // Currently only handles Java class-based functions Object func; try { - func = Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance(); + func = Thread.currentThread().getContextClassLoader().loadClass(className).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { throw new IllegalStateException( - String.format("Failed instantiating '%s'", catalogFunction.getClassName()), e); + String.format("Failed instantiating '%s'", className), e); } UserDefinedFunction udf = (UserDefinedFunction) func; @@ -69,7 +67,7 @@ public static FunctionDefinition createFunctionDefinition(String name, CatalogFu ); } else { throw new UnsupportedOperationException( - String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", catalogFunction.getClassName()) + String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", className) ); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterFunctionOperation.java index 865034618c01b..fab7e54b02fcf 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterFunctionOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterFunctionOperation.java @@ -28,7 +28,7 @@ import java.util.Map; /** - * Operation to describe a ALTER FUNCTION statement. + * Operation to describe a ALTER FUNCTION statement for temporary catalog function. */ public class AlterFunctionOperation implements AlterOperation { private final ObjectIdentifier functionIdentifier; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateFunctionOperation.java index 183044d0e6753..cc8e281eb1f08 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateFunctionOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateFunctionOperation.java @@ -28,7 +28,7 @@ import java.util.Map; /** - * Operation to describe a CREATE FUNCTION statement. + * Operation to describe a CREATE FUNCTION statement for catalog function. */ public class CreateFunctionOperation implements CreateOperation { private final ObjectIdentifier functionIdentifier; @@ -36,9 +36,9 @@ public class CreateFunctionOperation implements CreateOperation { private boolean ignoreIfExists; public CreateFunctionOperation( - ObjectIdentifier functionIdentifier, - CatalogFunction catalogFunction, - boolean ignoreIfExists) { + ObjectIdentifier functionIdentifier, + CatalogFunction catalogFunction, + boolean ignoreIfExists) { this.functionIdentifier = functionIdentifier; this.catalogFunction = catalogFunction; this.ignoreIfExists = ignoreIfExists; @@ -64,7 +64,7 @@ public String asSummaryString() { params.put("ignoreIfExists", ignoreIfExists); return OperationUtils.formatWithChildren( - "CREATE FUNCTION", + "CREATE CATALOG FUNCTION", params, Collections.emptyList(), Operation::asSummaryString); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java new file mode 100644 index 0000000000000..2b67d92e87827 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTempSystemFunctionOperation.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a CREATE FUNCTION statement for temporary system function. + */ +public class CreateTempSystemFunctionOperation implements CreateOperation { + private final String functionName; + private String functionClass; + private boolean ignoreIfExists; + + public CreateTempSystemFunctionOperation( + String functionName, + String functionClass, + boolean ignoreIfExists) { + this.functionName = functionName; + this.functionClass = functionClass; + this.ignoreIfExists = ignoreIfExists; + } + + public String getFunctionName() { + return this.functionName; + } + + public String getFunctionClass() { + return this.functionClass; + } + + public boolean isIgnoreIfExists() { + return this.ignoreIfExists; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("functionName", functionName); + params.put("functionClass", functionClass); + params.put("ignoreIfExists", ignoreIfExists); + + return OperationUtils.formatWithChildren( + "CREATE TEMPORARY SYSTEM FUNCTION", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropFunctionOperation.java index c7bdf2012b5af..dae39d45718b8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropFunctionOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropFunctionOperation.java @@ -27,20 +27,23 @@ import java.util.Map; /** - * Operation to describe a DROP FUNCTION statement. + * Operation to describe a DROP FUNCTION statement for catalog function. */ public class DropFunctionOperation implements DropOperation { private final ObjectIdentifier functionIdentifier; private final boolean ifExists; private final boolean isTemporary; + private final boolean isSystemFunction; public DropFunctionOperation( ObjectIdentifier functionIdentifier, boolean isTemporary, + boolean isSystemFunction, boolean ifExists) { this.functionIdentifier = functionIdentifier; this.ifExists = ifExists; this.isTemporary = isTemporary; + this.isSystemFunction = isSystemFunction; } public ObjectIdentifier getFunctionIdentifier() { @@ -56,6 +59,7 @@ public String asSummaryString() { Map params = new LinkedHashMap<>(); params.put("identifier", functionIdentifier); params.put("ifExists", ifExists); + params.put("isSystemFunction", isSystemFunction); params.put("isTemporary", isTemporary); return OperationUtils.formatWithChildren( @@ -69,6 +73,10 @@ public boolean isTemporary() { return isTemporary; } + public boolean isSystemFunction() { + return isSystemFunction; + } + public String getFunctionName() { return this.functionIdentifier.getObjectName(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTempSystemFunctionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTempSystemFunctionOperation.java new file mode 100644 index 0000000000000..b00d1893d97b5 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTempSystemFunctionOperation.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a DROP FUNCTION statement for temporary + * system function. + */ +public class DropTempSystemFunctionOperation implements DropOperation { + private final String functionName; + private final boolean ifExists; + + public DropTempSystemFunctionOperation( + String functionName, + boolean ifExists) { + this.functionName = functionName; + this.ifExists = ifExists; + } + + public String getFunctionName() { + return functionName; + } + + public boolean isIfExists() { + return ifExists; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("functionName", functionName); + params.put("ifExists", ifExists); + + return OperationUtils.formatWithChildren( + "DROP TEMPORARY SYSTEM FUNCTION", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java index 89559555ccd60..b37ed8be1638a 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java @@ -19,7 +19,6 @@ package org.apache.flink.table.functions; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.junit.Test; @@ -33,7 +32,7 @@ public class FunctionDefinitionUtilTest { public void testScalarFunction() { FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( "test", - new CatalogFunctionImpl(TestScalarFunction.class.getName()) + TestScalarFunction.class.getName() ); assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() instanceof TestScalarFunction); @@ -43,7 +42,7 @@ public void testScalarFunction() { public void testTableFunction() { FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( "test", - new CatalogFunctionImpl(TestTableFunction.class.getName()) + TestTableFunction.class.getName() ); assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction); @@ -53,7 +52,7 @@ public void testTableFunction() { public void testAggregateFunction() { FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( "test", - new CatalogFunctionImpl(TestAggFunction.class.getName()) + TestAggFunction.class.getName() ); assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction); @@ -63,7 +62,7 @@ public void testAggregateFunction() { public void testTableAggregateFunction() { FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition( "test", - new CatalogFunctionImpl(TestTableAggFunction.class.getName()) + TestTableAggFunction.class.getName() ); assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index ea5675aee329d..fab5feeb25955 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -60,9 +60,11 @@ import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateFunctionOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropFunctionOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -239,25 +241,42 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) { /** Convert CREATE FUNCTION statement. */ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) { - FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); - CatalogFunction catalogFunction = new CatalogFunctionImpl( - sqlCreateFunction.getFunctionClassName().getValueAs(String.class), language, sqlCreateFunction.isTemporary()); - - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - - return new CreateFunctionOperation( - identifier, - catalogFunction, - sqlCreateFunction.isIfNotExists() - ); + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier()); + + if (sqlCreateFunction.isSystemFunction()) { + return new CreateTempSystemFunctionOperation( + unresolvedIdentifier.getObjectName(), + sqlCreateFunction.getFunctionClassName().getValueAs(String.class), + sqlCreateFunction.isIfNotExists() + ); + } else { + FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); + CatalogFunction catalogFunction = new CatalogFunctionImpl( + sqlCreateFunction.getFunctionClassName().getValueAs(String.class), + language, + sqlCreateFunction.isTemporary()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new CreateFunctionOperation( + identifier, + catalogFunction, + sqlCreateFunction.isIfNotExists() + ); + } } /** Convert ALTER FUNCTION statement. */ private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) { + if (sqlAlterFunction.isSystemFunction()) { + throw new ValidationException("Alter temporary system function is not supported"); + } + FunctionLanguage language = parseLanguage(sqlAlterFunction.getFunctionLanguage()); CatalogFunction catalogFunction = new CatalogFunctionImpl( - sqlAlterFunction.getFunctionClassName().getValueAs(String.class), language, sqlAlterFunction.isTemporary()); + sqlAlterFunction.getFunctionClassName().getValueAs(String.class), + language, + sqlAlterFunction.isTemporary()); UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlAlterFunction.getFunctionIdentifier()); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); @@ -271,13 +290,21 @@ private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) { /** Convert DROP FUNCTION statement. */ private Operation convertDropFunction(SqlDropFunction sqlDropFunction) { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlDropFunction.getFunctionIdentifier()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - - return new DropFunctionOperation( - identifier, - sqlDropFunction.isTemporary(), - sqlDropFunction.getIfExists() - ); + if (sqlDropFunction.isSystemFunction()) { + return new DropTempSystemFunctionOperation( + unresolvedIdentifier.getObjectName(), + sqlDropFunction.getIfExists() + ); + } else { + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new DropFunctionOperation( + identifier, + sqlDropFunction.isTemporary(), + sqlDropFunction.isSystemFunction(), + sqlDropFunction.getIfExists() + ); + } } /** diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogFunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java similarity index 81% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogFunctionTestBase.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java index a6413217b5902..e788df93997df 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogFunctionTestBase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.catalog; +package org.apache.flink.table.planner.functions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.ValidationException; @@ -38,7 +38,7 @@ /** * Tests for {@link CatalogFunction}. */ -public abstract class CatalogFunctionTestBase { +public abstract class FunctionTestBase { protected static TableEnvironment tableEnv; @Test @@ -92,7 +92,7 @@ public void testCreateFunctionDBNotExists() { try { tableEnv.sqlUpdate(ddl1); } catch (Exception e){ - assertEquals(e.getMessage(), "Could not execute CREATE FUNCTION:" + + assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); } @@ -101,10 +101,10 @@ public void testCreateFunctionDBNotExists() { @Test public void testCreateTemporaryCatalogFunction() { String ddl1 = "create temporary function default_catalog.default_database.f4" + - " as 'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + - " as 'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String ddl3 = "drop temporary function default_catalog.default_database.f4"; @@ -135,10 +135,25 @@ public void testCreateTemporaryCatalogFunction() { } catch (Exception e) { assertTrue(e instanceof ValidationException); assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4` is not found"); + "Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist"); } } + @Test + public void testCreateTemporarySystemFunction() { + String ddl1 = "create temporary system function default_catalog.default_database.f5" + + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + + String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + + String ddl3 = "drop temporary system function default_catalog.default_database.f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + } + @Test public void testAlterFunction() throws Exception { String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; @@ -191,7 +206,7 @@ public void testAlterFunctionNonExists() { } @Test - public void testAlterTemporaryFunction() { + public void testAlterTemporaryCatalogFunction() { String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + " as 'org.apache.flink.function.TestFunction'"; @@ -204,6 +219,19 @@ public void testAlterTemporaryFunction() { } + @Test + public void testAlterTemporarySystemFunction() { + String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().equals("Alter temporary system function is not supported")); + } + } + @Test public void testDropFunctionNonExists() { String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4"; @@ -246,7 +274,7 @@ public void testDropTemporaryFunctionNonExits() { fail(); } catch (Exception e){ assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4` is not found"); + "Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist"); } try { @@ -254,21 +282,21 @@ public void testDropTemporaryFunctionNonExits() { fail(); } catch (Exception e) { assertEquals(e.getMessage(), - "Temporary catalog function `catalog1`.`default_database`.`f4` is not found"); + "Temporary catalog function `catalog1`.`default_database`.`f4` doesn't exist"); } try { tableEnv.sqlUpdate(dropFunctionInWrongDB); fail(); } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary catalog function `default_catalog`.`db1`.`f4` is not found"); + assertEquals(e.getMessage(), "Temporary catalog function `default_catalog`.`db1`.`f4` doesn't exist"); } } @Test public void testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier() { String createNoCatalogDB = "create temporary function f4" + - " as 'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String dropNoCatalogDB = "drop temporary function f4"; @@ -276,7 +304,7 @@ public void testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier( tableEnv.sqlUpdate(dropNoCatalogDB); String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + - " as 'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; @@ -284,7 +312,7 @@ public void testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier( tableEnv.sqlUpdate(dropNonExistsCatalog); String createNonExistsDB = "create temporary function default_catalog.db1.f4" + - " as 'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; @@ -292,6 +320,26 @@ public void testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier( tableEnv.sqlUpdate(dropNonExistsDB); } + @Test + public void testDropTemporarySystemFunction() { + String ddl1 = "create temporary system function f5" + + " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + + String ddl2 = "drop temporary system function f5"; + + String ddl3 = "drop temporary system function if exists f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + + try { + tableEnv.sqlUpdate(ddl2); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); + } + } + protected Row toRow(Object ... objects) { Row row = new Row(objects.length); for (int i = 0; i < objects.length; i++) { diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CatalogFunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java similarity index 83% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CatalogFunctionITCase.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java index 73b39e00fa189..a43c36a511186 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CatalogFunctionITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java @@ -19,17 +19,16 @@ package org.apache.flink.table.planner.runtime.batch.sql; import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.planner.catalog.CatalogFunctionTestBase; +import org.apache.flink.table.planner.functions.FunctionTestBase; import org.apache.flink.table.planner.utils.TestingTableEnvironment; import org.junit.BeforeClass; /** - * Tests for {@link CatalogFunction} in batch table environment. + * Tests for catalog and system functions in batch table environment. */ public class -CatalogFunctionITCase extends CatalogFunctionTestBase { +FunctionITCase extends FunctionTestBase { @BeforeClass public static void setup() { diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CatalogFunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java similarity index 81% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CatalogFunctionITCase.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java index 4cf6c79b3276c..8aa8d68b2e542 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CatalogFunctionITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java @@ -20,9 +20,8 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.planner.catalog.CatalogFunctionTestBase; import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.functions.FunctionTestBase; import org.apache.flink.table.planner.utils.TestingTableEnvironment; import org.apache.flink.types.Row; @@ -36,9 +35,9 @@ import static org.junit.Assert.assertArrayEquals; /** - * Tests for {@link CatalogFunction} in stream table environment. + * Tests for catalog and system in stream table environment. */ -public class CatalogFunctionITCase extends CatalogFunctionTestBase { +public class FunctionITCase extends FunctionTestBase { @BeforeClass public static void setup() { @@ -50,7 +49,7 @@ public static void setup() { @Test public void testUseDefinedRegularCatalogFunction() throws Exception { String functionDDL = "create function addOne as " + - "'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String dropFunctionDDL = "drop function addOne"; testUseDefinedCatalogFunction(functionDDL); @@ -61,7 +60,7 @@ public void testUseDefinedRegularCatalogFunction() throws Exception { @Test public void testUseDefinedTemporaryCatalogFunction() throws Exception { String functionDDL = "create temporary function addOne as " + - "'org.apache.flink.table.planner.catalog.CatalogFunctionTestBase$TestUDF'"; + "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; String dropFunctionDDL = "drop temporary function addOne"; testUseDefinedCatalogFunction(functionDDL); @@ -69,6 +68,17 @@ public void testUseDefinedTemporaryCatalogFunction() throws Exception { tableEnv.sqlUpdate(dropFunctionDDL); } + @Test + public void testUseDefinedTemporarySystemFunction() throws Exception { + String functionDDL = "create temporary system function addOne as " + + "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + + String dropFunctionDDL = "drop temporary system function addOne"; + testUseDefinedCatalogFunction(functionDDL); + // delete the function + tableEnv.sqlUpdate(dropFunctionDDL); + } + // This test case only works for stream mode private void testUseDefinedCatalogFunction(String createFunctionDDL) throws Exception { List sourceData = Arrays.asList( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index dc9f5b30d4e9f..47b758a760daf 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -63,9 +63,11 @@ import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateFunctionOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropFunctionOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.util.StringUtils; @@ -245,24 +247,37 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) { /** Convert CREATE FUNCTION statement. */ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) { - FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); - CatalogFunction catalogFunction = new CatalogFunctionImpl( - sqlCreateFunction.getFunctionClassName().getValueAs(String.class), - language, - sqlCreateFunction.isTemporary()); - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateFunction.getFunctionIdentifier()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - return new CreateFunctionOperation( - identifier, - catalogFunction, - sqlCreateFunction.isIfNotExists() - ); + if (sqlCreateFunction.isSystemFunction()) { + return new CreateTempSystemFunctionOperation( + unresolvedIdentifier.getObjectName(), + sqlCreateFunction.getFunctionClassName().getValueAs(String.class), + sqlCreateFunction.isIfNotExists() + ); + } else { + FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); + CatalogFunction catalogFunction = new CatalogFunctionImpl( + sqlCreateFunction.getFunctionClassName().getValueAs(String.class), + language, + sqlCreateFunction.isTemporary()); + + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new CreateFunctionOperation( + identifier, + catalogFunction, + sqlCreateFunction.isIfNotExists() + ); + } } /** Convert ALTER FUNCTION statement. */ private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) { + if (sqlAlterFunction.isSystemFunction()) { + throw new ValidationException("Alter temporary system function is not supported"); + } + FunctionLanguage language = parseLanguage(sqlAlterFunction.getFunctionLanguage()); CatalogFunction catalogFunction = new CatalogFunctionImpl( sqlAlterFunction.getFunctionClassName().getValueAs(String.class), @@ -281,12 +296,21 @@ private Operation convertAlterFunction(SqlAlterFunction sqlAlterFunction) { /** Convert DROP FUNCTION statement. */ private Operation convertDropFunction(SqlDropFunction sqlDropFunction) { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlDropFunction.getFunctionIdentifier()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - - return new DropFunctionOperation( - identifier, - sqlDropFunction.isTemporary(), - sqlDropFunction.getIfExists()); + if (sqlDropFunction.isSystemFunction()) { + return new DropTempSystemFunctionOperation( + unresolvedIdentifier.getObjectName(), + sqlDropFunction.getIfExists() + ); + } else { + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new DropFunctionOperation( + identifier, + sqlDropFunction.isTemporary(), + sqlDropFunction.isSystemFunction(), + sqlDropFunction.getIfExists() + ); + } } /** Fallback method for sql query. */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index c1edec9660ef1..51e86cb67699c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -547,11 +547,15 @@ abstract class TableEnvImpl( case ex: Exception => throw new TableException(exMsg, ex) } case createFunctionOperation: CreateFunctionOperation => - createCatalogFunction(createFunctionOperation) + createCatalogFunction(createFunctionOperation) + case createTempSystemFunctionOperation: CreateTempSystemFunctionOperation => + createSystemFunction(createTempSystemFunctionOperation) case alterFunctionOperation: AlterFunctionOperation => - alterCatalogFunction(alterFunctionOperation) + alterCatalogFunction(alterFunctionOperation) case dropFunctionOperation: DropFunctionOperation => - dropCatalogFunction(dropFunctionOperation) + dropCatalogFunction(dropFunctionOperation) + case dropTempSystemFunctionOperation: DropTempSystemFunctionOperation => + dropSystemFunction(dropTempSystemFunctionOperation) case useCatalogOperation: UseCatalogOperation => catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName) case useDatabaseOperation: UseDatabaseOperation => @@ -704,8 +708,8 @@ abstract class TableEnvImpl( createFunctionOperation.getFunctionIdentifier); if (!exist) { val functionDefinition = FunctionDefinitionUtil.createFunctionDefinition( - createFunctionOperation.getFunctionName, function) - registerFunctionInFunctionCatalog( + createFunctionOperation.getFunctionName, function.getClassName) + registerCatalogFunctionInFunctionCatalog( createFunctionOperation.getFunctionIdentifier, functionDefinition) } else if (!createFunctionOperation.isIgnoreIfExists) { @@ -753,16 +757,8 @@ abstract class TableEnvImpl( val exMsg = getDDLOpExecuteErrorMsg(dropFunctionOperation.asSummaryString) try { if (dropFunctionOperation.isTemporary) { - - val exist = functionCatalog.hasTemporaryCatalogFunction( - dropFunctionOperation.getFunctionIdentifier) - if (exist) { functionCatalog.dropTempCatalogFunction( dropFunctionOperation.getFunctionIdentifier, dropFunctionOperation.isIfExists) - } else if (!dropFunctionOperation.isIfExists) { - throw new ValidationException(String.format("Temporary catalog function %s is not found", - dropFunctionOperation.getFunctionIdentifier.asSerializableString)) - } } else { val catalog = getCatalogOrThrowException( dropFunctionOperation.getFunctionIdentifier.getCatalogName) @@ -777,16 +773,54 @@ abstract class TableEnvImpl( } } - private def registerFunctionInFunctionCatalog[T, ACC]( + private def createSystemFunction( + createFunctionOperation: CreateTempSystemFunctionOperation): Unit = { + val exMsg = getDDLOpExecuteErrorMsg(createFunctionOperation.asSummaryString) + try { + val exist = functionCatalog.hasTemporarySystemFunction( + createFunctionOperation.getFunctionName) + if (!exist) { + val functionDefinition = FunctionDefinitionUtil.createFunctionDefinition( + createFunctionOperation.getFunctionName, createFunctionOperation.getFunctionClass) + registerSystemFunctionInFunctionCatalog( + createFunctionOperation.getFunctionName, + functionDefinition) + } else if (!createFunctionOperation.isIgnoreIfExists) { + throw new ValidationException( + String.format("Temporary system function %s is already defined", + createFunctionOperation.getFunctionName)) + } + } catch { + case e: ValidationException => + throw e + case e: Exception => + throw new TableException(exMsg, e) + } + } + + private def dropSystemFunction(dropFunctionOperation: DropTempSystemFunctionOperation): Unit = { + val exMsg = getDDLOpExecuteErrorMsg(dropFunctionOperation.asSummaryString) + try { + functionCatalog.dropTempSystemFunction( + dropFunctionOperation.getFunctionName, dropFunctionOperation.isIfExists) + } catch { + case e: ValidationException => + throw e + case e: Exception => + throw new TableException(exMsg, e) + } + } + + private def registerCatalogFunctionInFunctionCatalog[T, ACC]( functionIdentifier: ObjectIdentifier, functionDefinition: FunctionDefinition): Unit = { + if (functionDefinition.isInstanceOf[ScalarFunctionDefinition]) { val scalarFunctionDefinition = functionDefinition.asInstanceOf[ScalarFunctionDefinition] functionCatalog.registerTempCatalogScalarFunction( functionIdentifier, scalarFunctionDefinition.getScalarFunction) - } - else if (functionDefinition.isInstanceOf[AggregateFunctionDefinition]) { + } else if (functionDefinition.isInstanceOf[AggregateFunctionDefinition]) { val aggregateFunctionDefinition = functionDefinition.asInstanceOf[AggregateFunctionDefinition] val aggregateFunction = aggregateFunctionDefinition .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]] @@ -798,12 +832,48 @@ abstract class TableEnvImpl( aggregateFunction, typeInfo, accTypeInfo) + + } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) { + val tableFunctionDefinition = functionDefinition.asInstanceOf[TableFunctionDefinition] + val tableFunction = tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]] + val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction) + functionCatalog.registerTempCatalogTableFunction( + functionIdentifier, + tableFunction, + typeInfo) + } - else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) { + } + + private def registerSystemFunctionInFunctionCatalog[T, ACC]( + functionName: String, + functionDefinition: FunctionDefinition): Unit = { + if (functionDefinition.isInstanceOf[ScalarFunctionDefinition]) { + val scalarFunctionDefinition = functionDefinition.asInstanceOf[ScalarFunctionDefinition] + functionCatalog.registerTempSystemScalarFunction( + functionName, + scalarFunctionDefinition.getScalarFunction) + } else if (functionDefinition.isInstanceOf[AggregateFunctionDefinition]) { + val aggregateFunctionDefinition = functionDefinition.asInstanceOf[AggregateFunctionDefinition] + val aggregateFunction = aggregateFunctionDefinition + .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]] + val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction) + val accTypeInfo = UserFunctionsTypeHelper + .getAccumulatorTypeOfAggregateFunction(aggregateFunction) + functionCatalog.registerTempSystemAggregateFunction( + functionName, + aggregateFunction, + typeInfo, + accTypeInfo) + } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) { val tableFunctionDefinition = functionDefinition.asInstanceOf[TableFunctionDefinition] val tableFunction = tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]] val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction) - functionCatalog.registerTempCatalogTableFunction(functionIdentifier, tableFunction, typeInfo) + functionCatalog.registerTempSystemTableFunction( + functionName, + tableFunction, + typeInfo) + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogFunctionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java similarity index 79% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogFunctionTestBase.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java index 7872da3caed37..5815da6599163 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogFunctionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java @@ -16,12 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.catalog; +package org.apache.flink.table.functions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.factories.utils.TestCollectionTableFactory; -import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; import org.junit.Test; @@ -37,9 +39,9 @@ import static org.junit.Assert.fail; /** - * Tests for {@link CatalogFunction}. + * Tests for both catalog and system function. */ -public abstract class CatalogFunctionTestBase { +public abstract class FunctionTestBase { private static TableEnvironment tableEnv; public static void setTableEnv(TableEnvironment e) { @@ -99,7 +101,7 @@ public void testCreateFunctionDBNotExists() { try { tableEnv.sqlUpdate(ddl1); } catch (Exception e){ - assertEquals(e.getMessage(), "Could not execute CREATE FUNCTION:" + + assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); } @@ -108,10 +110,10 @@ public void testCreateFunctionDBNotExists() { @Test public void testCreateTemporaryCatalogFunction() { String ddl1 = "create temporary function default_catalog.default_database.f4" + - " as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + - " as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String ddl3 = "drop temporary function default_catalog.default_database.f4"; @@ -144,10 +146,25 @@ public void testCreateTemporaryCatalogFunction() { assertTrue(e instanceof ValidationException); assertEquals(e.getMessage(), "Temporary catalog function `default_catalog`.`default_database`.`f4`" + - " is not found"); + " doesn't exist"); } } + @Test + public void testCreateTemporarySystemFunction() { + String ddl1 = "create temporary system function default_catalog.default_database.f5" + + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; + + String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + + " as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'"; + + String ddl3 = "drop temporary system function default_catalog.default_database.f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + } + @Test public void testAlterFunction() throws Exception { String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; @@ -200,7 +217,7 @@ public void testAlterFunctionNonExists() { } @Test - public void testAlterTemporaryFunction() { + public void testAlterTemporaryCatalogFunction() { String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + " as 'org.apache.flink.function.TestFunction'"; @@ -210,7 +227,19 @@ public void testAlterTemporaryFunction() { } catch (Exception e) { assertTrue(e.getMessage().equals("Alter temporary catalog function is not supported")); } + } + @Test + public void testAlterTemporarySystemFunction() { + String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().equals("Alter temporary system function is not supported")); + } } @Test @@ -256,7 +285,7 @@ public void testDropTemporaryFunctionNonExits() { fail(); } catch (Exception e){ assertEquals(e.getMessage(), "Temporary catalog function" + - " `default_catalog`.`default_database`.`f4` is not found"); + " `default_catalog`.`default_database`.`f4` doesn't exist"); } try { @@ -264,7 +293,7 @@ public void testDropTemporaryFunctionNonExits() { fail(); } catch (Exception e) { assertEquals(e.getMessage(), "Temporary catalog function " + - "`catalog1`.`default_database`.`f4` is not found"); + "`catalog1`.`default_database`.`f4` doesn't exist"); } try { @@ -272,17 +301,14 @@ public void testDropTemporaryFunctionNonExits() { fail(); } catch (Exception e) { assertEquals(e.getMessage(), "Temporary catalog function " + - "`default_catalog`.`db1`.`f4` is not found"); + "`default_catalog`.`db1`.`f4` doesn't exist"); } } @Test public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { String createNoCatalogDB = "create temporary function f4" + - " as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; - - String alterNoCatalogDB = "alter temporary function f4 " + - "as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF2'"; + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String dropNoCatalogDB = "drop temporary function f4"; @@ -290,7 +316,7 @@ public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { tableEnv.sqlUpdate(dropNoCatalogDB); String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + - " as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; @@ -298,7 +324,7 @@ public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { tableEnv.sqlUpdate(dropNonExistsCatalog); String createNonExistsDB = "create temporary function default_catalog.db1.f4" + - " as 'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; @@ -306,10 +332,30 @@ public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { tableEnv.sqlUpdate(dropNonExistsDB); } + @Test + public void testDropTemporarySystemFunction() { + String ddl1 = "create temporary system function f5" + + " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; + + String ddl2 = "drop temporary system function f5"; + + String ddl3 = "drop temporary system function if exists f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + + try { + tableEnv.sqlUpdate(ddl2); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); + } + } + @Test public void testUseDefinedRegularCatalogFunction() throws Exception { String functionDDL = "create function addOne as " + - "'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String dropFunctionDDL = "drop function addOne"; testUseDefinedCatalogFunction(functionDDL); @@ -320,7 +366,7 @@ public void testUseDefinedRegularCatalogFunction() throws Exception { @Test public void testUseDefinedTemporaryCatalogFunction() throws Exception { String functionDDL = "create temporary function addOne as " + - "'org.apache.flink.table.catalog.CatalogFunctionTestBase$TestUDF'"; + "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; String dropFunctionDDL = "drop temporary function addOne"; testUseDefinedCatalogFunction(functionDDL); @@ -328,6 +374,17 @@ public void testUseDefinedTemporaryCatalogFunction() throws Exception { tableEnv.sqlUpdate(dropFunctionDDL); } + @Test + public void testUseDefinedTemporarySystemFunction() throws Exception { + String functionDDL = "create temporary system function addOne as " + + "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; + + String dropFunctionDDL = "drop temporary system function addOne"; + testUseDefinedCatalogFunction(functionDDL); + // delete the function + tableEnv.sqlUpdate(dropFunctionDDL); + } + private void testUseDefinedCatalogFunction(String createFunctionDDL) throws Exception { List sourceData = Arrays.asList( toRow(1, "1000", 2), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/CatalogFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java similarity index 84% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/CatalogFunctionITCase.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java index e32629bf321f5..7e2a10f882cc1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/CatalogFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java @@ -20,15 +20,14 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogFunctionTestBase; +import org.apache.flink.table.functions.FunctionTestBase; import org.junit.BeforeClass; /** - * Tests for {@link CatalogFunction} in batch table environment. + * Tests for catalog and system function in batch table environment. */ -public class CatalogFunctionITCase extends CatalogFunctionTestBase { +public class FunctionITCase extends FunctionTestBase { private static ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); @BeforeClass diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/CatalogFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java similarity index 85% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/CatalogFunctionITCase.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java index 1911154a8012a..045c4b0c5a5dd 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/CatalogFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java @@ -20,15 +20,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogFunctionTestBase; +import org.apache.flink.table.functions.FunctionTestBase; import org.junit.BeforeClass; /** - * Tests for {@link CatalogFunction} in stream table environment. + * Tests for catalog and system function in stream table environment. */ -public class CatalogFunctionITCase extends CatalogFunctionTestBase { +public class FunctionITCase extends FunctionTestBase { private static StreamExecutionEnvironment streamExecEnvironment; @BeforeClass