Skip to content

Commit

Permalink
[FLINK-10689] [table] Port UDF interfaces to flink-table-common
Browse files Browse the repository at this point in the history
This commit ports all UDF extension points to Java and relocates them
to flink-table-common. Projects that just want to provide functions
don't need to import flink-table and thus Scala anymore.

This closes apache#7059.
  • Loading branch information
xueyumusic authored and twalthr committed Dec 3, 2018
1 parent ba10c91 commit 2690186
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 514 deletions.
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;

/**
* Base class for User-Defined Aggregates.
*
* <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
* methods. An {@link AggregateFunction} needs at least three methods:
* - createAccumulator,
* - accumulate, and
* - getValue.
*
* <p>There are a few other methods that can be optional to have:
* - retract,
* - merge, and
* - resetAccumulator
*
* <p>All these methods must be declared publicly, not static and named exactly as the names
* mentioned above. The methods createAccumulator and getValue are defined in the
* {@link AggregateFunction} functions, while other methods are explained below.
*
* <p>Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
* requires at least one accumulate() method.
*
*
* <p>Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
*
* <p>Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
*
* <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
* dataset grouping aggregate.
*
*
* @param T the type of the aggregation result
* @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
* AggregateFunction represents its state using accumulator, thereby the state of the
* AggregateFunction must be put into the accumulator.
*/
@PublicEvolving
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

/**
* Creates and init the Accumulator for this {@link AggregateFunction}.
*
* @return the accumulator with the initial value
*/
public abstract ACC createAccumulator();

/**
* Called every time when an aggregation result should be materialized.
* The returned value could be either an early and incomplete result
* (periodically emitted as data arrive) or the final result of the
* aggregation.
*
* @param accumulator the accumulator which contains the current
* aggregated results
* @return the aggregation result
*/
public abstract T getValue(ACC accumulator);

/**
* Returns true if this AggregateFunction can only be applied in an OVER window.
*
* @return true if the AggregateFunction requires an OVER window, false otherwise.
*/
public boolean requiresOver() {
return false;
}

/**
* Returns the TypeInformation of the AggregateFunction's result.
*
* @return The TypeInformation of the AggregateFunction's result or null if the result type
* should be automatically inferred.
*/
public TypeInformation<T> getResultType() {
return null;
}

/**
* Returns the TypeInformation of the AggregateFunction's accumulator.
*
* @return The TypeInformation of the AggregateFunction's accumulator or null if the
* accumulator type should be automatically inferred.
*/
public TypeInformation<ACC> getAccumulatorType() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;

import java.io.File;

/**
* A FunctionContext allows to obtain global runtime information about the context in which the
* user-defined function is executed. The information include the metric group,
* the distributed cache files, and the global job parameters.
*/
@PublicEvolving
public class FunctionContext {

/**
* @param context the runtime context in which the Flink Function is executed
*/
private RuntimeContext context;

public FunctionContext(RuntimeContext context) {
this.context = context;
}

/**
* Returns the metric group for this parallel subtask.
*
* @return metric group for this parallel subtask.
*/
public MetricGroup getMetricGroup() {
return context.getMetricGroup();
}

/**
* Gets the local temporary file copy of a distributed cache files.
*
* @param name distributed cache file name
* @return local temporary file copy of a distributed cache file.
*/
public File getCachedFile(String name) {
return context.getDistributedCache().getFile(name);
}

/**
* Gets the global job parameter value associated with the given key as a string.
*
* @param key key pointing to the associated value
* @param defaultValue default value which is returned in case global job parameter is null
* or there is no value associated with the given key
* @return (default) value associated with the given key
*/
public String getJobParameter(String key, String defaultValue) {
GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
if (conf != null && conf.toMap().containsKey(key)) {
return conf.toMap().get(key);
} else {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.ValidationException;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
* or multiple scalar values to a new scalar value.
*
* <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation
* method. An evaluation method must be declared publicly and named "eval". Evaluation methods
* can also be overloaded by implementing multiple methods named "eval".
*
* <p>User-defined functions must have a default constructor and must be instantiable during runtime.
*
* <p>By default the result type of an evaluation method is determined by Flink's type extraction
* facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
* complex, custom, or composite types. In these cases {@link TypeInformation} of the result type
* can be manually defined by overriding {@link ScalarFunction#getResultType}.
*
* <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
* If a user-defined scalar function should not introduce much overhead during runtime, it is
* recommended to declare parameters and result types as primitive types instead of their boxed
* classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
*/
@PublicEvolving
public abstract class ScalarFunction extends UserDefinedFunction {

/**
* Returns the result type of the evaluation method with a given signature.
*
* <p>This method needs to be overridden in case Flink's type extraction facilities are not
* sufficient to extract the {@link TypeInformation} based on the return type of the evaluation
* method. Flink's type extraction facilities can handle basic types or
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the return type needs to be determined
* @return {@link TypeInformation} of result type or null if Flink should determine the type
*/
public TypeInformation<?> getResultType(Class<?>[] signature) {
return null;
}

/**
* Returns {@link TypeInformation} about the operands of the evaluation method with a given
* signature.
*
* <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
* necessary to determine the parameter {@link TypeInformation} of an evaluation method.
* By default Flink's type extraction facilities are used for this but might be wrong for
* more complex, custom, or composite types.
*
* @param signature signature of the method the operand types need to be determined
* @return {@link TypeInformation} of operand types
*/
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
try {
return TypeExtractor.getForClass(c);
} catch (InvalidTypesException e) {
throw new ValidationException(
"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
"automatically determined. Please provide type information manually.");
}
}).collect(Collectors.toList());

return typeList.toArray(new TypeInformation<?>[0]);
}
}
Loading

0 comments on commit 2690186

Please sign in to comment.