Skip to content

Commit

Permalink
[FLINK-10689] [table] Improve docs and fix bugs of ported classes
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Dec 3, 2018
1 parent 2690186 commit 98437c7
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 102 deletions.
5 changes: 0 additions & 5 deletions flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<scope>provided</scope>
</dependency>

<!-- The following dependencies are for connector/format sql-jars that
we copy using the maven-dependency-plugin. When extending the test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,89 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;

/**
* Base class for User-Defined Aggregates.
* Base class for user-defined aggregates.
*
* <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
* methods. An {@link AggregateFunction} needs at least three methods:
* - createAccumulator,
* - accumulate, and
* - getValue.
* - <code>createAccumulator</code>,
* - <code>accumulate</code>, and
* - <code>getValue</code>.
*
* <p>There are a few other methods that can be optional to have:
* - retract,
* - merge, and
* - resetAccumulator
* - <code>retract</code>,
* - <code>merge</code>, and
* - <code>resetAccumulator</code>.
*
* <p>All these methods must be declared publicly, not static and named exactly as the names
* mentioned above. The methods createAccumulator and getValue are defined in the
* {@link AggregateFunction} functions, while other methods are explained below.
* <p>All these methods must be declared publicly, not static, and named exactly as the names
* mentioned above. The methods {@link #createAccumulator()} and {@link #getValue} are defined in
* the {@link AggregateFunction} functions, while other methods are explained below.
*
* <p>Processes the input values and update the provided accumulator instance. The method
* <pre>
* {@code
* Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
* requires at least one accumulate() method.
*
* param: accumulator the accumulator which contains the current aggregated results
* param: [user defined inputs] the input value (usually obtained from a new arrived data).
*
* <p>Retracts the input values from the accumulator instance. The current design assumes the
* public void accumulate(ACC accumulator, [user defined inputs])
* }
* </pre>
*
* <pre>
* {@code
* Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
* data stream bounded OVER aggregates.
*
* param: accumulator the accumulator which contains the current aggregated results
* param: [user defined inputs] the input value (usually obtained from a new arrived data).
*
* public void retract(ACC accumulator, [user defined inputs])
* }
* </pre>
*
* <pre>
* {@code
* Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for data stream session window grouping aggregates and data set grouping aggregates.
*
* param: accumulator the accumulator which will keep the merged aggregate results. It should
* be noted that the accumulator may contain the previous aggregated
* results. Therefore user should not replace or clean this instance in the
* custom merge method.
* param: its an java.lang.Iterable pointed to a group of accumulators that will be
* merged.
*
* public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
* }
* </pre>
*
* <p>Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
* <pre>
* {@code
* Resets the accumulator for this AggregateFunction. This function must be implemented for
* data set grouping aggregates.
*
* <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
* dataset grouping aggregate.
* param: accumulator the accumulator which needs to be reset
*
* public void resetAccumulator(ACC accumulator)
* }
* </pre>
*
* @param T the type of the aggregation result
* @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
* AggregateFunction represents its state using accumulator, thereby the state of the
* AggregateFunction must be put into the accumulator.
* @param <T> the type of the aggregation result
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
* AggregateFunction represents its state using accumulator, thereby the state of the
* AggregateFunction must be put into the accumulator.
*/
@PublicEvolving
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

/**
* Creates and init the Accumulator for this {@link AggregateFunction}.
* Creates and initializes the accumulator for this {@link AggregateFunction}. The accumulator
* is used to keep the aggregated values which are needed to compute an aggregation result.
*
* @return the accumulator with the initial value
*/
Expand All @@ -85,29 +123,31 @@ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
public abstract T getValue(ACC accumulator);

/**
* Returns true if this AggregateFunction can only be applied in an OVER window.
* Returns <code>true</code> if this {@link AggregateFunction} can only be applied in an
* OVER window.
*
* @return true if the AggregateFunction requires an OVER window, false otherwise.
* @return <code>true</code> if the {@link AggregateFunction} requires an OVER window,
* <code>false</code> otherwise.
*/
public boolean requiresOver() {
return false;
}

/**
* Returns the TypeInformation of the AggregateFunction's result.
* Returns the {@link TypeInformation} of the {@link AggregateFunction}'s result.
*
* @return The TypeInformation of the AggregateFunction's result or null if the result type
* should be automatically inferred.
* @return The {@link TypeInformation} of the {@link AggregateFunction}'s result or
* <code>null</code> if the result type should be automatically inferred.
*/
public TypeInformation<T> getResultType() {
return null;
}

/**
* Returns the TypeInformation of the AggregateFunction's accumulator.
* Returns the {@link TypeInformation} of the {@link AggregateFunction}'s accumulator.
*
* @return The TypeInformation of the AggregateFunction's accumulator or null if the
* accumulator type should be automatically inferred.
* @return The {@link TypeInformation} of the {@link AggregateFunction}'s accumulator or
* <code>null</code> if the accumulator type should be automatically inferred.
*/
public TypeInformation<ACC> getAccumulatorType() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,28 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;

import java.io.File;

/**
* A FunctionContext allows to obtain global runtime information about the context in which the
* user-defined function is executed. The information include the metric group,
* the distributed cache files, and the global job parameters.
* A {@link FunctionContext} allows to obtain global runtime information about the context in which the
* user-defined function is executed.
*
* <p>The information includes the metric group, distributed cache files, and global job parameters.
*/
@PublicEvolving
public class FunctionContext {

/**
* @param context the runtime context in which the Flink Function is executed
*/
private RuntimeContext context;

/**
* Wraps the underlying {@link RuntimeContext}.
*
* @param context the runtime context in which Flink's {@link Function} is executed.
*/
public FunctionContext(RuntimeContext context) {
this.context = context;
}
Expand Down Expand Up @@ -70,7 +74,7 @@ public File getCachedFile(String name) {
* @return (default) value associated with the given key
*/
public String getJobParameter(String key, String defaultValue) {
GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
final GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters();
if (conf != null && conf.toMap().containsKey(key)) {
return conf.toMap().get(key);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.ValidationException;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
* or multiple scalar values to a new scalar value.
*
* <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation
* method. An evaluation method must be declared publicly and named "eval". Evaluation methods
* can also be overloaded by implementing multiple methods named "eval".
* method. An evaluation method must be declared publicly and named <code>eval</code>. Evaluation
* methods can also be overloaded by implementing multiple methods named <code>eval</code>.
*
* <p>User-defined functions must have a default constructor and must be instantiable during runtime.
*
Expand All @@ -46,7 +42,8 @@
* <p>Internally, the Table/SQL API code generation works with primitive values as much as possible.
* If a user-defined scalar function should not introduce much overhead during runtime, it is
* recommended to declare parameters and result types as primitive types instead of their boxed
* classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
* classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal
* to <code>long</code>.
*/
@PublicEvolving
public abstract class ScalarFunction extends UserDefinedFunction {
Expand All @@ -60,7 +57,8 @@ public abstract class ScalarFunction extends UserDefinedFunction {
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the return type needs to be determined
* @return {@link TypeInformation} of result type or null if Flink should determine the type
* @return {@link TypeInformation} of result type or <code>null</code> if Flink should
* determine the type
*/
public TypeInformation<?> getResultType(Class<?>[] signature) {
return null;
Expand All @@ -70,26 +68,25 @@ public TypeInformation<?> getResultType(Class<?>[] signature) {
* Returns {@link TypeInformation} about the operands of the evaluation method with a given
* signature.
*
* <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be
* necessary to determine the parameter {@link TypeInformation} of an evaluation method.
* By default Flink's type extraction facilities are used for this but might be wrong for
* more complex, custom, or composite types.
* <p>In order to perform operand type inference in SQL (especially when <code>NULL</code> is
* used) it might be necessary to determine the parameter {@link TypeInformation} of an
* evaluation method. By default Flink's type extraction facilities are used for this but might
* be wrong for more complex, custom, or composite types.
*
* @param signature signature of the method the operand types need to be determined
* @return {@link TypeInformation} of operand types
*/
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> {
final TypeInformation<?>[] types = new TypeInformation<?>[signature.length];
for (int i = 0; i < signature.length; i++) {
try {
return TypeExtractor.getForClass(c);
types[i] = TypeExtractor.getForClass(signature[i]);
} catch (InvalidTypesException e) {
throw new ValidationException(
"Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " +
"automatically determined. Please provide type information manually.");
"Parameter types of scalar function " + this.getClass().getCanonicalName() +
" cannot be automatically determined. Please provide type information manually.");
}
}).collect(Collectors.toList());

return typeList.toArray(new TypeInformation<?>[0]);
}
return types;
}
}
Loading

0 comments on commit 98437c7

Please sign in to comment.