From 269018683aa4d900948bc6ff6061089fb51f5689 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Sun, 25 Nov 2018 00:21:31 +0800 Subject: [PATCH] [FLINK-10689] [table] Port UDF interfaces to flink-table-common This commit ports all UDF extension points to Java and relocates them to flink-table-common. Projects that just want to provide functions don't need to import flink-table and thus Scala anymore. This closes #7059. --- .../flink-sql-client-test/pom.xml | 2 +- .../table/functions/AggregateFunction.java | 115 ++++++++++++++ .../table/functions/FunctionContext.java | 80 ++++++++++ .../flink/table/functions/ScalarFunction.java | 95 +++++++++++ .../flink/table/functions/TableFunction.java | 120 ++++++++++++++ .../table/functions/UserDefinedFunction.java | 70 +++++++++ .../flink/table/api/scala/expressionDsl.scala | 8 +- .../table/functions/AggregateFunction.scala | 139 ---------------- .../table/functions/FunctionContext.scala | 66 -------- .../table/functions/ScalarFunction.scala | 98 ------------ .../flink/table/functions/TableFunction.scala | 148 ------------------ .../table/functions/UserDefinedFunction.scala | 61 -------- .../utils/userDefinedScalarFunctions.scala | 1 + 13 files changed, 489 insertions(+), 514 deletions(-) create mode 100644 flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java create mode 100644 flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java create mode 100644 flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java create mode 100644 flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java create mode 100644 flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index 1a6a80e9c8f3f..6a0534c58edcf 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -33,7 +33,7 @@ under the License. org.apache.flink - flink-table_${scala.binary.version} + flink-table-common ${project.version} provided diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java new file mode 100644 index 0000000000000..63a4d3f7a2f10 --- /dev/null +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Base class for User-Defined Aggregates. + * + *

The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom + * methods. An {@link AggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - getValue. + * + *

There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator + * + *

All these methods must be declared publicly, not static and named exactly as the names + * mentioned above. The methods createAccumulator and getValue are defined in the + * {@link AggregateFunction} functions, while other methods are explained below. + * + *

Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. An AggregateFunction + * requires at least one accumulate() method. + * + * + *

Retracts the input values from the accumulator instance. The current design assumes the + * inputs are the values that have been previously accumulated. The method retract can be + * overloaded with different custom types and arguments. This function must be implemented for + * datastream bounded over aggregate. + * + *

Merges a group of accumulator instances into one accumulator instance. This function must be + * implemented for datastream session window grouping aggregate and dataset grouping aggregate. + * + *

Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for + * dataset grouping aggregate. + * + * + * @param T the type of the aggregation result + * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the + * aggregated values which are needed to compute an aggregation result. + * AggregateFunction represents its state using accumulator, thereby the state of the + * AggregateFunction must be put into the accumulator. + */ +@PublicEvolving +public abstract class AggregateFunction extends UserDefinedFunction { + + /** + * Creates and init the Accumulator for this {@link AggregateFunction}. + * + * @return the accumulator with the initial value + */ + public abstract ACC createAccumulator(); + + /** + * Called every time when an aggregation result should be materialized. + * The returned value could be either an early and incomplete result + * (periodically emitted as data arrive) or the final result of the + * aggregation. + * + * @param accumulator the accumulator which contains the current + * aggregated results + * @return the aggregation result + */ + public abstract T getValue(ACC accumulator); + + /** + * Returns true if this AggregateFunction can only be applied in an OVER window. + * + * @return true if the AggregateFunction requires an OVER window, false otherwise. + */ + public boolean requiresOver() { + return false; + } + + /** + * Returns the TypeInformation of the AggregateFunction's result. + * + * @return The TypeInformation of the AggregateFunction's result or null if the result type + * should be automatically inferred. + */ + public TypeInformation getResultType() { + return null; + } + + /** + * Returns the TypeInformation of the AggregateFunction's accumulator. + * + * @return The TypeInformation of the AggregateFunction's accumulator or null if the + * accumulator type should be automatically inferred. + */ + public TypeInformation getAccumulatorType() { + return null; + } +} diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java new file mode 100644 index 0000000000000..93b2229a7c631 --- /dev/null +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.MetricGroup; + +import java.io.File; + +/** + * A FunctionContext allows to obtain global runtime information about the context in which the + * user-defined function is executed. The information include the metric group, + * the distributed cache files, and the global job parameters. + */ +@PublicEvolving +public class FunctionContext { + + /** + * @param context the runtime context in which the Flink Function is executed + */ + private RuntimeContext context; + + public FunctionContext(RuntimeContext context) { + this.context = context; + } + + /** + * Returns the metric group for this parallel subtask. + * + * @return metric group for this parallel subtask. + */ + public MetricGroup getMetricGroup() { + return context.getMetricGroup(); + } + + /** + * Gets the local temporary file copy of a distributed cache files. + * + * @param name distributed cache file name + * @return local temporary file copy of a distributed cache file. + */ + public File getCachedFile(String name) { + return context.getDistributedCache().getFile(name); + } + + /** + * Gets the global job parameter value associated with the given key as a string. + * + * @param key key pointing to the associated value + * @param defaultValue default value which is returned in case global job parameter is null + * or there is no value associated with the given key + * @return (default) value associated with the given key + */ + public String getJobParameter(String key, String defaultValue) { + GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters(); + if (conf != null && conf.toMap().containsKey(key)) { + return conf.toMap().get(key); + } else { + return defaultValue; + } + } +} diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java new file mode 100644 index 0000000000000..091258ececf05 --- /dev/null +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one, + * or multiple scalar values to a new scalar value. + * + *

The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + *

User-defined functions must have a default constructor and must be instantiable during runtime. + * + *

By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type + * can be manually defined by overriding {@link ScalarFunction#getResultType}. + * + *

Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined scalar function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + */ +@PublicEvolving +public abstract class ScalarFunction extends UserDefinedFunction { + + /** + * Returns the result type of the evaluation method with a given signature. + * + *

This method needs to be overridden in case Flink's type extraction facilities are not + * sufficient to extract the {@link TypeInformation} based on the return type of the evaluation + * method. Flink's type extraction facilities can handle basic types or + * simple POJOs but might be wrong for more complex, custom, or composite types. + * + * @param signature signature of the method the return type needs to be determined + * @return {@link TypeInformation} of result type or null if Flink should determine the type + */ + public TypeInformation getResultType(Class[] signature) { + return null; + } + + /** + * Returns {@link TypeInformation} about the operands of the evaluation method with a given + * signature. + * + *

In order to perform operand type inference in SQL (especially when NULL is used) it might be + * necessary to determine the parameter {@link TypeInformation} of an evaluation method. + * By default Flink's type extraction facilities are used for this but might be wrong for + * more complex, custom, or composite types. + * + * @param signature signature of the method the operand types need to be determined + * @return {@link TypeInformation} of operand types + */ + public TypeInformation[] getParameterTypes(Class[] signature) { + + List> typeList = Arrays.asList(signature).stream().map(c -> { + try { + return TypeExtractor.getForClass(c); + } catch (InvalidTypesException e) { + throw new ValidationException( + "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " + + "automatically determined. Please provide type information manually."); + } + }).collect(Collectors.toList()); + + return typeList.toArray(new TypeInformation[0]); + } +} diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java new file mode 100644 index 0000000000000..ae6b03a7d4f2b --- /dev/null +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Collector; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * zero, one, or multiple scalar values as input and returns multiple rows as output. + * + *

The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly, not static and named "eval". + * Evaluation methods can also be overloaded by implementing multiple methods named "eval". + * + *

User-defined functions must have a default constructor and must be instantiable during runtime. + * + *

By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type + * can be manually defined by overriding {@link TableFunction#getResultType}. + * + *

Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * + * @param T The type of the output row + */ +@PublicEvolving +public abstract class TableFunction extends UserDefinedFunction { + + /** + * Emit an output row. + * + * @param row the output row + */ + protected void collect(T row) { + collector.collect(row); + } + + /** + * The code generated collector used to emit row. + */ + protected Collector collector; + + /** + * Internal use. Sets the current collector. + */ + public final void setCollector(Collector collector) { + this.collector = collector; + } + + /** + * Returns the result type of the evaluation method with a given signature. + * + *

This method needs to be overridden in case Flink's type extraction facilities are not + * sufficient to extract the {@link TypeInformation} based on the return type of the evaluation + * method. Flink's type extraction facilities can handle basic types or + * simple POJOs but might be wrong for more complex, custom, or composite types. + * + * @return {@link TypeInformation} of result type or null if Flink should determine the type + */ + public TypeInformation getResultType() { + return null; + } + + /** + * Returns {@link TypeInformation} about the operands of the evaluation method with a given + * signature. + * + *

In order to perform operand type inference in SQL (especially when NULL is used) it might be + * necessary to determine the parameter {@link TypeInformation} of an evaluation method. + * By default Flink's type extraction facilities are used for this but might be wrong for + * more complex, custom, or composite types. + * + * @param signature signature of the method the operand types need to be determined + * @return {@link TypeInformation} of operand types + */ + public TypeInformation[] getParameterTypes(Class[] signature) { + + List> typeList = Arrays.asList(signature).stream().map(c -> { + try { + return TypeExtractor.getForClass(c); + } catch (InvalidTypesException e) { + throw new ValidationException( + "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " + + "automatically determined. Please provide type information manually."); + } + }).collect(Collectors.toList()); + + return typeList.toArray(new TypeInformation[0]); + } + +} diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java new file mode 100644 index 0000000000000..6c91c10490756 --- /dev/null +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.utils.EncodingUtils; + +import java.io.Serializable; + +/** + * Base class for all user-defined functions such as scalar functions, table functions, + * or aggregation functions. + */ +@PublicEvolving +public abstract class UserDefinedFunction implements Serializable { + /** + * Setup method for user-defined function. It can be used for initialization work. + * By default, this method does nothing. + */ + public void open(FunctionContext context) throws Exception { + + } + + /** + * Tear-down method for user-defined function. It can be used for clean up work. + * By default, this method does nothing. + */ + public void close() throws Exception { + + } + + /** + * @return true if and only if a call to this function is guaranteed to always return + * the same result given the same parameters; true is assumed by default + * if user's function is not pure functional, like random(), date(), now()... + * isDeterministic must return false + */ + public boolean isDeterministic() { + return true; + } + + public String functionIdentifier() { + String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))); + return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5); + } + + /** + * Returns the name of the UDF that is used for plan explain and logging. + */ + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index f9fb93cb7faaa..390960ca8551f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -28,7 +28,7 @@ import org.apache.flink.table.api.Table import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction} +import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction, ScalarFunction} import scala.language.implicitConversions @@ -1021,6 +1021,12 @@ trait ImplicitExpressionConversions { def expr = Literal(sqlTimestamp) } + implicit class ScalarFunctionCallExpression(val s: ScalarFunction) { + def apply(params: Expression*): Expression = { + ScalarFunctionCall(s, params) + } + } + implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) implicit def byte2Literal(b: Byte): Expression = Literal(b) implicit def short2Literal(s: Short): Expression = Literal(s) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala deleted file mode 100644 index d3f9497e1bb5b..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.functions - -import org.apache.flink.api.common.typeinfo.TypeInformation - -/** - * Base class for User-Defined Aggregates. - * - * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom - * methods. An [[AggregateFunction]] needs at least three methods: - * - createAccumulator, - * - accumulate, and - * - getValue. - * - * There are a few other methods that can be optional to have: - * - retract, - * - merge, and - * - resetAccumulator - * - * All these methods must be declared publicly, not static and named exactly as the names - * mentioned above. The methods createAccumulator and getValue are defined in the - * [[AggregateFunction]] functions, while other methods are explained below. - * - * - * {{{ - * Processes the input values and update the provided accumulator instance. The method - * accumulate can be overloaded with different custom types and arguments. An AggregateFunction - * requires at least one accumulate() method. - * - * @param accumulator the accumulator which contains the current aggregated results - * @param [user defined inputs] the input value (usually obtained from a new arrived data). - * - * def accumulate(accumulator: ACC, [user defined inputs]): Unit - * }}} - * - * - * {{{ - * Retracts the input values from the accumulator instance. The current design assumes the - * inputs are the values that have been previously accumulated. The method retract can be - * overloaded with different custom types and arguments. This function must be implemented for - * datastream bounded over aggregate. - * - * @param accumulator the accumulator which contains the current aggregated results - * @param [user defined inputs] the input value (usually obtained from a new arrived data). - * - * def retract(accumulator: ACC, [user defined inputs]): Unit - * }}} - * - * - * {{{ - * Merges a group of accumulator instances into one accumulator instance. This function must be - * implemented for datastream session window grouping aggregate and dataset grouping aggregate. - * - * @param accumulator the accumulator which will keep the merged aggregate results. It should - * be noted that the accumulator may contain the previous aggregated - * results. Therefore user should not replace or clean this instance in the - * custom merge method. - * @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be - * merged. - * - * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit - * }}} - * - * - * {{{ - * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for - * dataset grouping aggregate. - * - * @param accumulator the accumulator which needs to be reset - * - * def resetAccumulator(accumulator: ACC): Unit - * }}} - * - * - * @tparam T the type of the aggregation result - * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the - * aggregated values which are needed to compute an aggregation result. - * AggregateFunction represents its state using accumulator, thereby the state of the - * AggregateFunction must be put into the accumulator. - */ -abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { - /** - * Creates and init the Accumulator for this [[AggregateFunction]]. - * - * @return the accumulator with the initial value - */ - def createAccumulator(): ACC - - /** - * Called every time when an aggregation result should be materialized. - * The returned value could be either an early and incomplete result - * (periodically emitted as data arrive) or the final result of the - * aggregation. - * - * @param accumulator the accumulator which contains the current - * aggregated results - * @return the aggregation result - */ - def getValue(accumulator: ACC): T - - /** - * Returns true if this AggregateFunction can only be applied in an OVER window. - * - * @return true if the AggregateFunction requires an OVER window, false otherwise. - */ - def requiresOver: Boolean = false - - /** - * Returns the TypeInformation of the AggregateFunction's result. - * - * @return The TypeInformation of the AggregateFunction's result or null if the result type - * should be automatically inferred. - */ - def getResultType: TypeInformation[T] = null - - /** - * Returns the TypeInformation of the AggregateFunction's accumulator. - * - * @return The TypeInformation of the AggregateFunction's accumulator or null if the - * accumulator type should be automatically inferred. - */ - def getAccumulatorType: TypeInformation[ACC] = null -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala deleted file mode 100644 index beeb686f035d7..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/FunctionContext.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.functions - -import java.io.File - -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.metrics.MetricGroup - -/** - * A FunctionContext allows to obtain global runtime information about the context in which the - * user-defined function is executed. The information include the metric group, - * the distributed cache files, and the global job parameters. - * - * @param context the runtime context in which the Flink Function is executed - */ -class FunctionContext(context: RuntimeContext) { - - /** - * Returns the metric group for this parallel subtask. - * - * @return metric group for this parallel subtask. - */ - def getMetricGroup: MetricGroup = context.getMetricGroup - - /** - * Gets the local temporary file copy of a distributed cache files. - * - * @param name distributed cache file name - * @return local temporary file copy of a distributed cache file. - */ - def getCachedFile(name: String): File = context.getDistributedCache.getFile(name) - - /** - * Gets the global job parameter value associated with the given key as a string. - * - * @param key key pointing to the associated value - * @param defaultValue default value which is returned in case global job parameter is null - * or there is no value associated with the given key - * @return (default) value associated with the given key - */ - def getJobParameter(key: String, defaultValue: String): String = { - val conf = context.getExecutionConfig.getGlobalJobParameters - if (conf != null && conf.toMap.containsKey(key)) { - conf.toMap.get(key) - } else { - defaultValue - } - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala deleted file mode 100644 index 4c01c1c7d9a2d..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.functions - -import org.apache.flink.api.common.functions.InvalidTypesException -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall} - -/** - * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one, - * or multiple scalar values to a new scalar value. - * - * The behavior of a [[ScalarFunction]] can be defined by implementing a custom evaluation - * method. An evaluation method must be declared publicly and named "eval". Evaluation methods - * can also be overloaded by implementing multiple methods named "eval". - * - * User-defined functions must have a default constructor and must be instantiable during runtime. - * - * By default the result type of an evaluation method is determined by Flink's type extraction - * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more - * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type - * can be manually defined by overriding [[getResultType()]]. - * - * Internally, the Table/SQL API code generation works with primitive values as much as possible. - * If a user-defined scalar function should not introduce much overhead during runtime, it is - * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. - */ -abstract class ScalarFunction extends UserDefinedFunction { - - /** - * Creates a call to a [[ScalarFunction]] in Scala Table API. - * - * @param params actual parameters of function - * @return [[Expression]] in form of a [[ScalarFunctionCall]] - */ - final def apply(params: Expression*): Expression = { - ScalarFunctionCall(this, params) - } - - // ---------------------------------------------------------------------------------------------- - - /** - * Returns the result type of the evaluation method with a given signature. - * - * This method needs to be overridden in case Flink's type extraction facilities are not - * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation - * method. Flink's type extraction facilities can handle basic types or - * simple POJOs but might be wrong for more complex, custom, or composite types. - * - * @param signature signature of the method the return type needs to be determined - * @return [[TypeInformation]] of result type or null if Flink should determine the type - */ - def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null - - /** - * Returns [[TypeInformation]] about the operands of the evaluation method with a given - * signature. - * - * In order to perform operand type inference in SQL (especially when NULL is used) it might be - * necessary to determine the parameter [[TypeInformation]] of an evaluation method. - * By default Flink's type extraction facilities are used for this but might be wrong for - * more complex, custom, or composite types. - * - * @param signature signature of the method the operand types need to be determined - * @return [[TypeInformation]] of operand types - */ - def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { - signature.map { c => - try { - TypeExtractor.getForClass(c) - } catch { - case ite: InvalidTypesException => - throw new ValidationException( - s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " + - s"automatically determined. Please provide type information manually.") - } - } - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala deleted file mode 100644 index e892a4cddcc7e..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.functions - -import org.apache.flink.api.common.functions.InvalidTypesException -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.table.api.ValidationException -import org.apache.flink.util.Collector - -/** - * Base class for a user-defined table function (UDTF). A user-defined table functions works on - * zero, one, or multiple scalar values as input and returns multiple rows as output. - * - * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation - * method. An evaluation method must be declared publicly, not static and named "eval". - * Evaluation methods can also be overloaded by implementing multiple methods named "eval". - * - * User-defined functions must have a default constructor and must be instantiable during runtime. - * - * By default the result type of an evaluation method is determined by Flink's type extraction - * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more - * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type - * can be manually defined by overriding [[getResultType()]]. - * - * Internally, the Table/SQL API code generation works with primitive values as much as possible. - * If a user-defined table function should not introduce much overhead during runtime, it is - * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. - * - * Example: - * - * {{{ - * - * public class Split extends TableFunction { - * - * // implement an "eval" method with as many parameters as you want - * public void eval(String str) { - * for (String s : str.split(" ")) { - * collect(s); // use collect(...) to emit an output row - * } - * } - * - * // you can overload the eval method here ... - * } - * - * val tEnv: TableEnvironment = ... - * val table: Table = ... // schema: [a: String] - * - * // for Scala users - * val split = new Split() - * table.join(split('c) as ('s)).select('a, 's) - * - * // for Java users - * tEnv.registerFunction("split", new Split()) // register table function first - * table.join(new Table(tEnv, "split(a) as (s)")).select("a, s") - * - * // for SQL users - * tEnv.registerFunction("split", new Split()) // register table function first - * tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)") - * - * }}} - * - * @tparam T The type of the output row - */ -abstract class TableFunction[T] extends UserDefinedFunction { - - // ---------------------------------------------------------------------------------------------- - - /** - * Emit an output row. - * - * @param row the output row - */ - protected def collect(row: T): Unit = { - collector.collect(row) - } - - // ---------------------------------------------------------------------------------------------- - - /** - * The code generated collector used to emit row. - */ - private var collector: Collector[T] = _ - - /** - * Internal use. Sets the current collector. - */ - private[flink] final def setCollector(collector: Collector[T]): Unit = { - this.collector = collector - } - - // ---------------------------------------------------------------------------------------------- - - /** - * Returns the result type of the evaluation method with a given signature. - * - * This method needs to be overridden in case Flink's type extraction facilities are not - * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation - * method. Flink's type extraction facilities can handle basic types or - * simple POJOs but might be wrong for more complex, custom, or composite types. - * - * @return [[TypeInformation]] of result type or null if Flink should determine the type - */ - def getResultType: TypeInformation[T] = null - - /** - * Returns [[TypeInformation]] about the operands of the evaluation method with a given - * signature. - * - * In order to perform operand type inference in SQL (especially when NULL is used) it might be - * necessary to determine the parameter [[TypeInformation]] of an evaluation method. - * By default Flink's type extraction facilities are used for this but might be wrong for - * more complex, custom, or composite types. - * - * @param signature signature of the method the operand types need to be determined - * @return [[TypeInformation]] of operand types - */ - def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { - signature.map { c => - try { - TypeExtractor.getForClass(c) - } catch { - case ite: InvalidTypesException => - throw new ValidationException( - s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " + - s"automatically determined. Please provide type information manually.") - } - } - } - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala deleted file mode 100644 index 89ba0d4f364f9..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.functions - -import org.apache.flink.table.utils.EncodingUtils - -/** - * Base class for all user-defined functions such as scalar functions, table functions, - * or aggregation functions. - */ -abstract class UserDefinedFunction extends Serializable { - /** - * Setup method for user-defined function. It can be used for initialization work. - * - * By default, this method does nothing. - */ - @throws(classOf[Exception]) - def open(context: FunctionContext): Unit = {} - - /** - * Tear-down method for user-defined function. It can be used for clean up work. - * - * By default, this method does nothing. - */ - @throws(classOf[Exception]) - def close(): Unit = {} - - /** - * @return true if and only if a call to this function is guaranteed to always return - * the same result given the same parameters; true is assumed by default - * if user's function is not pure functional, like random(), date(), now()... - * isDeterministic must return false - */ - def isDeterministic: Boolean = true - - final def functionIdentifier: String = { - val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))) - getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5) - } - - /** - * Returns the name of the UDF that is used for plan explain and logging. - */ - override def toString: String = getClass.getSimpleName - -} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala index 29de7e0f9d199..b70e582eff2c9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala @@ -29,6 +29,7 @@ import org.junit.Assert import scala.annotation.varargs import scala.collection.mutable +import scala.collection.JavaConversions._ import scala.io.Source case class SimplePojo(name: String, age: Int)