Skip to content

Commit

Permalink
[FLINK-14906][table] create and drop temp system functions from DDL t…
Browse files Browse the repository at this point in the history
…o FunctionCatalog

this closes apache#10484.
  • Loading branch information
HuangZhenQiu authored and bowenli86 committed Dec 8, 2019
1 parent 42276ba commit 5bc914e
Show file tree
Hide file tree
Showing 21 changed files with 610 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public boolean isTemporary() {
return isTemporary;
}

public boolean isSystemFunction() {
return isSystemFunction;
}

public boolean isIfExists() {
return this.ifExists;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public boolean isTemporary() {
return isTemporary;
}

public boolean isSystemFunction() {
return isSystemFunction;
}

public boolean getIfExists() {
return this.ifExists;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropFunctionOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
Expand Down Expand Up @@ -571,12 +573,20 @@ public void sqlUpdate(String stmt) {
} else if (operation instanceof CreateFunctionOperation) {
CreateFunctionOperation createFunctionOperation = (CreateFunctionOperation) operation;
createCatalogFunction(createFunctionOperation);
} else if (operation instanceof CreateTempSystemFunctionOperation) {
CreateTempSystemFunctionOperation createtempSystemFunctionOperation =
(CreateTempSystemFunctionOperation) operation;
createSystemFunction(createtempSystemFunctionOperation);
} else if (operation instanceof AlterFunctionOperation) {
AlterFunctionOperation alterFunctionOperation = (AlterFunctionOperation) operation;
alterCatalogFunction(alterFunctionOperation);
} else if (operation instanceof DropFunctionOperation) {
DropFunctionOperation dropFunctionOperation = (DropFunctionOperation) operation;
dropCatalogFunction(dropFunctionOperation);
} else if (operation instanceof DropTempSystemFunctionOperation) {
DropTempSystemFunctionOperation dropTempSystemFunctionOperation =
(DropTempSystemFunctionOperation) operation;
dropSystemFunction(dropTempSystemFunctionOperation);
} else if (operation instanceof UseCatalogOperation) {
UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
Expand Down Expand Up @@ -746,11 +756,9 @@ private void createCatalogFunction(CreateFunctionOperation createFunctionOperati
createFunctionOperation.getFunctionIdentifier());
if (!exist) {
FunctionDefinition functionDefinition = FunctionDefinitionUtil.createFunctionDefinition(
createFunctionOperation.getFunctionName(),
function);
registerFunctionInFunctionCatalog(
createFunctionOperation.getFunctionIdentifier(),
functionDefinition);
createFunctionOperation.getFunctionName(), function.getClassName());
registerCatalogFunctionInFunctionCatalog(
createFunctionOperation.getFunctionIdentifier(), functionDefinition);
} else if (!createFunctionOperation.isIgnoreIfExists()) {
throw new ValidationException(
String.format("Temporary catalog function %s is already defined",
Expand Down Expand Up @@ -802,17 +810,9 @@ private void dropCatalogFunction(DropFunctionOperation dropFunctionOperation) {
String exMsg = getDDLOpExecuteErrorMsg(dropFunctionOperation.asSummaryString());
try {
if (dropFunctionOperation.isTemporary()) {
boolean exist = functionCatalog.hasTemporaryCatalogFunction(
dropFunctionOperation.getFunctionIdentifier());
if (exist) {
functionCatalog.dropTempCatalogFunction(
dropFunctionOperation.getFunctionIdentifier(),
dropFunctionOperation.isIfExists());
} else if (!dropFunctionOperation.isIfExists()) {
throw new ValidationException(
String.format("Temporary catalog function %s is not found",
dropFunctionOperation.getFunctionIdentifier().asSerializableString()));
}
functionCatalog.dropTempCatalogFunction(
dropFunctionOperation.getFunctionIdentifier(),
dropFunctionOperation.isIfExists());
} else {
Catalog catalog = getCatalogOrThrowException
(dropFunctionOperation.getFunctionIdentifier().getCatalogName());
Expand All @@ -830,11 +830,47 @@ private void dropCatalogFunction(DropFunctionOperation dropFunctionOperation) {
}
}

private <T, ACC> void registerFunctionInFunctionCatalog(
private void createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
boolean exist = functionCatalog.hasTemporarySystemFunction(operation.getFunctionName());
if (!exist) {
FunctionDefinition functionDefinition = FunctionDefinitionUtil.createFunctionDefinition(
operation.getFunctionName(),
operation.getFunctionClass());
registerSystemFunctionInFunctionCatalog(operation.getFunctionName(), functionDefinition);

} else if (!operation.isIgnoreIfExists()) {
throw new ValidationException(
String.format("Temporary system function %s is already defined",
operation.getFunctionName()));
}
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new TableException(exMsg, e);
}
}

private void dropSystemFunction(DropTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
functionCatalog.dropTempSystemFunction(
operation.getFunctionName(),
operation.isIfExists());
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new TableException(exMsg, e);
}
}

private <T, ACC> void registerCatalogFunctionInFunctionCatalog(
ObjectIdentifier functionIdentifier, FunctionDefinition functionDefinition) {
if (functionDefinition instanceof ScalarFunctionDefinition) {
ScalarFunctionDefinition scalarFunction = (ScalarFunctionDefinition) functionDefinition;
functionCatalog.registerTempCatalogScalarFunction(functionIdentifier, scalarFunction.getScalarFunction());
functionCatalog.registerTempCatalogScalarFunction(
functionIdentifier, scalarFunction.getScalarFunction());
} else if (functionDefinition instanceof AggregateFunctionDefinition) {
AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition;
AggregateFunction<T, ACC > aggregateFunction =
Expand All @@ -861,6 +897,41 @@ private <T, ACC> void registerFunctionInFunctionCatalog(
}
}

private <T, ACC> void registerSystemFunctionInFunctionCatalog(
String functionName, FunctionDefinition functionDefinition) {

if (functionDefinition instanceof ScalarFunctionDefinition) {
ScalarFunctionDefinition scalarFunction = (ScalarFunctionDefinition) functionDefinition;
functionCatalog.registerTempSystemScalarFunction(
functionName, scalarFunction.getScalarFunction());
} else if (functionDefinition instanceof AggregateFunctionDefinition) {
AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition;
AggregateFunction<T, ACC > aggregateFunction =
(AggregateFunction<T, ACC >) aggregateFunctionDefinition.getAggregateFunction();
TypeInformation<T> typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
functionCatalog.registerTempSystemAggregateFunction(
functionName,
aggregateFunction,
typeInfo,
accTypeInfo);

} else if (functionDefinition instanceof TableFunctionDefinition) {
TableFunctionDefinition tableFunctionDefinition = (TableFunctionDefinition) functionDefinition;
TableFunction<T> tableFunction = (TableFunction<T>) tableFunctionDefinition.getTableFunction();
TypeInformation<T> typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tableFunction);

functionCatalog.registerTempSystemTableFunction(
functionName,
tableFunction,
typeInfo);
}

}

protected TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public CatalogFunctionImpl(String className) {
this(className, FunctionLanguage.JAVA, false);
}

public CatalogFunctionImpl(String className, FunctionLanguage functionLanguage, boolean isTemporary) {
public CatalogFunctionImpl(
String className,
FunctionLanguage functionLanguage,
boolean isTemporary) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(className), "className cannot be null or empty");
this.className = className;
this.functionLanguage = checkNotNull(functionLanguage, "functionLanguage cannot be null");
Expand Down Expand Up @@ -92,8 +95,8 @@ public FunctionLanguage getFunctionLanguage() {
public String toString() {
return "CatalogFunctionImpl{" +
"className='" + getClassName() + "', " +
"functionLanguage='" + getFunctionLanguage() +
"isGeneric='" + isGeneric() +
"functionLanguage='" + getFunctionLanguage() + "', " +
"isGeneric='" + isGeneric() + "', " +
"isTemporary='" + isTemporary() +
"'}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ public boolean hasTemporaryCatalogFunction(ObjectIdentifier functionIdentifier)
return tempCatalogFunctions.containsKey(normalizedIdentifier);
}

/**
* Check whether a temporary system function is already registered.
* @param functionName the name of the function
* @return whether the temporary system function exists in the function catalog
*/
public boolean hasTemporarySystemFunction(String functionName) {
return tempSystemFunctions.containsKey(functionName);
}

/**
* Drop a temporary system function.
*
Expand Down Expand Up @@ -328,7 +337,8 @@ private Optional<FunctionLookup.Result> resolvePreciseFunctionReference(ObjectId
fd = catalog.getFunctionDefinitionFactory().get()
.createFunctionDefinition(oi.getObjectName(), catalogFunction);
} else {
fd = FunctionDefinitionUtil.createFunctionDefinition(oi.getObjectName(), catalogFunction);
fd = FunctionDefinitionUtil.createFunctionDefinition(
oi.getObjectName(), catalogFunction.getClassName());
}

return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@

package org.apache.flink.table.functions;

import org.apache.flink.table.catalog.CatalogFunction;

/**
* A util to instantiate {@link FunctionDefinition} in the default way.
*/
public class FunctionDefinitionUtil {

public static FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
public static FunctionDefinition createFunctionDefinition(String name, String className) {
// Currently only handles Java class-based functions
Object func;
try {
func = Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
func = Thread.currentThread().getContextClassLoader().loadClass(className).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new IllegalStateException(
String.format("Failed instantiating '%s'", catalogFunction.getClassName()), e);
String.format("Failed instantiating '%s'", className), e);
}

UserDefinedFunction udf = (UserDefinedFunction) func;
Expand Down Expand Up @@ -69,7 +67,7 @@ public static FunctionDefinition createFunctionDefinition(String name, CatalogFu
);
} else {
throw new UnsupportedOperationException(
String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", catalogFunction.getClassName())
String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", className)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;

/**
* Operation to describe a ALTER FUNCTION statement.
* Operation to describe a ALTER FUNCTION statement for temporary catalog function.
*/
public class AlterFunctionOperation implements AlterOperation {
private final ObjectIdentifier functionIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
import java.util.Map;

/**
* Operation to describe a CREATE FUNCTION statement.
* Operation to describe a CREATE FUNCTION statement for catalog function.
*/
public class CreateFunctionOperation implements CreateOperation {
private final ObjectIdentifier functionIdentifier;
private CatalogFunction catalogFunction;
private boolean ignoreIfExists;

public CreateFunctionOperation(
ObjectIdentifier functionIdentifier,
CatalogFunction catalogFunction,
boolean ignoreIfExists) {
ObjectIdentifier functionIdentifier,
CatalogFunction catalogFunction,
boolean ignoreIfExists) {
this.functionIdentifier = functionIdentifier;
this.catalogFunction = catalogFunction;
this.ignoreIfExists = ignoreIfExists;
Expand All @@ -64,7 +64,7 @@ public String asSummaryString() {
params.put("ignoreIfExists", ignoreIfExists);

return OperationUtils.formatWithChildren(
"CREATE FUNCTION",
"CREATE CATALOG FUNCTION",
params,
Collections.emptyList(),
Operation::asSummaryString);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Operation to describe a CREATE FUNCTION statement for temporary system function.
*/
public class CreateTempSystemFunctionOperation implements CreateOperation {
private final String functionName;
private String functionClass;
private boolean ignoreIfExists;

public CreateTempSystemFunctionOperation(
String functionName,
String functionClass,
boolean ignoreIfExists) {
this.functionName = functionName;
this.functionClass = functionClass;
this.ignoreIfExists = ignoreIfExists;
}

public String getFunctionName() {
return this.functionName;
}

public String getFunctionClass() {
return this.functionClass;
}

public boolean isIgnoreIfExists() {
return this.ignoreIfExists;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("functionName", functionName);
params.put("functionClass", functionClass);
params.put("ignoreIfExists", ignoreIfExists);

return OperationUtils.formatWithChildren(
"CREATE TEMPORARY SYSTEM FUNCTION",
params,
Collections.emptyList(),
Operation::asSummaryString);
}
}
Loading

0 comments on commit 5bc914e

Please sign in to comment.