Skip to content

Commit

Permalink
[FLINK-15487][table] Allow registering FLIP-65 functions in TableEnvi…
Browse files Browse the repository at this point in the history
…ronment

Updates all catalog related interfaces to FLIP-65. It also continues FLIP-64 by
exposing new interfaces in table environment for registering functions of
temporary/system/catalog kind. Furthermore, it adds early class validation
to the function catalog.

The first functions will be fully functional once the code generator has been
updated.

This closes apache#10942.
  • Loading branch information
twalthr committed Jan 29, 2020
1 parent 631722d commit fe284c8
Show file tree
Hide file tree
Showing 31 changed files with 1,836 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ def excluded_methods(cls):
'getCompletionHints',
'create',
'loadModule',
'unloadModule'}
'unloadModule',
'createTemporarySystemFunction',
'dropTemporarySystemFunction',
'createFunction',
'dropFunction',
'createTemporaryFunction',
'dropTemporaryFunction'}

@classmethod
def java_method_name(cls, python_method_name):
Expand Down
8 changes: 8 additions & 0 deletions flink-table/flink-table-api-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.api;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -26,10 +27,12 @@
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;

import java.io.Serializable;
import java.util.Optional;

/**
Expand Down Expand Up @@ -131,15 +134,162 @@ static TableEnvironment create(EnvironmentSettings settings) {
/**
* Registers a {@link ScalarFunction} under a unique name. Replaces already existing
* user-defined functions under this name.
*
* @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. Please
* note that the new method also uses the new type system and reflective extraction logic. It
* might be necessary to update the function implementation as well. See the documentation of
* {@link ScalarFunction} for more information on the new function design.
*/
@Deprecated
void registerFunction(String name, ScalarFunction function);

/**
* Registers a {@link UserDefinedFunction} class as a temporary system function.
*
* <p>Compared to {@link #createTemporaryFunction(String, Class)}, system functions are identified
* by a global name that is independent of the current catalog and current database. Thus, this method
* allows to extend the set of built-in system functions like {@code TRIM}, {@code ABS}, etc.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name exists,
* it will be inaccessible in the current session. To make the permanent function available again
* one can drop the corresponding temporary system function.
*
* @param name The name under which the function will be registered globally.
* @param functionClass The function class containing the implementation.
*/
@Experimental
void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

/**
* Registers a {@link UserDefinedFunction} instance as a temporary system function.
*
* <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method takes a function
* instance that might have been parameterized before (e.g. through its constructor). This might be
* useful for more interactive sessions. Make sure that the instance is {@link Serializable}.
*
* <p>Compared to {@link #createTemporaryFunction(String, UserDefinedFunction)}, system functions are
* identified by a global name that is independent of the current catalog and current database. Thus,
* this method allows to extend the set of built-in system functions like {@code TRIM}, {@code ABS}, etc.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name exists,
* it will be inaccessible in the current session. To make the permanent function available again
* one can drop the corresponding temporary system function.
*
* @param name The name under which the function will be registered globally.
* @param functionInstance The (possibly pre-configured) function instance containing the implementation.
*/
@Experimental
void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);

/**
* Drops a temporary system function registered under the given name.
*
* <p>If a permanent function with the given name exists, it will be used from now on for any queries
* that reference this name.
*
* @param name The name under which the function has been registered globally.
* @return true if a function existed under the given name and was removed
*/
@Experimental
boolean dropTemporarySystemFunction(String name);

/**
* Registers a {@link UserDefinedFunction} class as a catalog function in the given path.
*
* <p>Compared to system functions with a globally defined name, catalog functions are always (implicitly
* or explicitly) identified by a catalog and database.
*
* <p>There must not be another function (temporary or permanent) registered under the same path.
*
* @param path The path under which the function will be registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param functionClass The function class containing the implementation.
*/
@Experimental
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

/**
* Registers a {@link UserDefinedFunction} class as a catalog function in the given path.
*
* <p>Compared to system functions with a globally defined name, catalog functions are always (implicitly
* or explicitly) identified by a catalog and database.
*
* @param path The path under which the function will be registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param functionClass The function class containing the implementation.
* @param ignoreIfExists If a function exists under the given path and this flag is set, no operation
* is executed. An exception is thrown otherwise.
*/
@Experimental
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists);

/**
* Drops a catalog function registered in the given path.
*
* @param path The path under which the function has been registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @return true if a function existed in the given path and was removed
*/
@Experimental
boolean dropFunction(String path);

/**
* Registers a {@link UserDefinedFunction} class as a temporary catalog function.
*
* <p>Compared to {@link #createTemporarySystemFunction(String, Class)} with a globally defined name,
* catalog functions are always (implicitly or explicitly) identified by a catalog and database.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name exists,
* it will be inaccessible in the current session. To make the permanent function available again
* one can drop the corresponding temporary function.
*
* @param path The path under which the function will be registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param functionClass The function class containing the implementation.
*/
@Experimental
void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass);

/**
* Registers a {@link UserDefinedFunction} instance as a temporary catalog function.
*
* <p>Compared to {@link #createTemporaryFunction(String, Class)}, this method takes a function instance
* that might have been parameterized before (e.g. through its constructor). This might be useful for more
* interactive sessions. Make sure that the instance is {@link Serializable}.
*
* <p>Compared to {@link #createTemporarySystemFunction(String, UserDefinedFunction)} with a globally
* defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name exists,
* it will be inaccessible in the current session. To make the permanent function available again
* one can drop the corresponding temporary function.
*
* @param path The path under which the function will be registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @param functionInstance The (possibly pre-configured) function instance containing the implementation.
*/
@Experimental
void createTemporaryFunction(String path, UserDefinedFunction functionInstance);

/**
* Drops a temporary catalog function registered in the given path.
*
* <p>If a permanent function with the given path exists, it will be used from now on for any queries
* that reference this path.
*
* @param path The path under which the function will be registered.
* See also the {@link TableEnvironment} class description for the format of the path.
* @return true if a function existed in the given path and was removed
*/
@Experimental
boolean dropTemporaryFunction(String path);

/**
* Registers a {@link Table} under a unique name in the TableEnvironment's catalog.
* Registered tables can be referenced in SQL queries.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the table will be registered.
Expand All @@ -153,7 +303,7 @@ static TableEnvironment create(EnvironmentSettings settings) {
* Registers a {@link Table} API object as a temporary view similar to SQL temporary views.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param path The path under which the view will be registered.
Expand All @@ -167,7 +317,7 @@ static TableEnvironment create(EnvironmentSettings settings) {
* Registered tables can be referenced in SQL queries.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSource} is registered.
Expand All @@ -183,7 +333,7 @@ static TableEnvironment create(EnvironmentSettings settings) {
* Registered sink tables can be referenced in SQL DML statements.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSink} is registered.
Expand All @@ -201,7 +351,7 @@ static TableEnvironment create(EnvironmentSettings settings) {
* Registered sink tables can be referenced in SQL DML statements.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
* be inaccessible in the current session. To make the permanent object available again you can drop the
* be inaccessible in the current session. To make the permanent object available again one can drop the
* corresponding temporary object.
*
* @param name The name under which the {@link TableSink} is registered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
Expand Down Expand Up @@ -273,6 +274,72 @@ public void registerFunction(String name, ScalarFunction function) {
function);
}

@Override
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass) {
final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(functionClass);
createTemporarySystemFunction(name, functionInstance);
}

@Override
public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance) {
functionCatalog.registerTemporarySystemFunction(
name,
functionInstance,
false);
}

@Override
public boolean dropTemporarySystemFunction(String name) {
return functionCatalog.dropTemporarySystemFunction(
name,
true);
}

@Override
public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass) {
createFunction(path, functionClass, false);
}

@Override
public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
functionCatalog.registerCatalogFunction(
unresolvedIdentifier,
functionClass,
ignoreIfExists);
}

@Override
public boolean dropFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
return functionCatalog.dropCatalogFunction(
unresolvedIdentifier,
true);
}

@Override
public void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass) {
final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(functionClass);
createTemporaryFunction(path, functionInstance);
}

@Override
public void createTemporaryFunction(String path, UserDefinedFunction functionInstance) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
functionCatalog.registerTemporaryCatalogFunction(
unresolvedIdentifier,
functionInstance,
false);
}

@Override
public boolean dropTemporaryFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
return functionCatalog.dropTemporaryCatalogFunction(
unresolvedIdentifier,
true);
}

@Override
public void registerTable(String name, Table table) {
UnresolvedIdentifier identifier = UnresolvedIdentifier.of(name);
Expand Down Expand Up @@ -354,7 +421,7 @@ public void insertInto(String targetPath, Table table) {
public void insertInto(Table table, String sinkPath, String... sinkPathContinued) {
List<String> fullPath = new ArrayList<>(Arrays.asList(sinkPathContinued));
fullPath.add(0, sinkPath);
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(fullPath.toArray(new String[0]));
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(fullPath);

insertIntoInternal(unresolvedIdentifier, table);
}
Expand Down Expand Up @@ -871,15 +938,14 @@ private void createSystemFunction(CreateTempSystemFunctionOperation operation) {
}

private void dropSystemFunction(DropTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
functionCatalog.dropTempSystemFunction(
functionCatalog.dropTemporarySystemFunction(
operation.getFunctionName(),
operation.isIfExists());
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new TableException(exMsg, e);
throw new TableException(getDDLOpExecuteErrorMsg(operation.asSummaryString()), e);
}
}

Expand Down
Loading

0 comments on commit fe284c8

Please sign in to comment.