From 2e62ae70afe87057aa633703293ab82d7620eda2 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Fri, 10 Jan 2020 13:27:17 +0100 Subject: [PATCH] [FLINK-15487][table-common] Make TypeInference mandatory for function definitions This makes TypeInference mandatory for function definitions and updates FunctionDefinition, UserDefinedFunction, and ScalarFunction to FLIP-65. This closes #10928. --- .../api/internal/TableEnvironmentImpl.java | 1 + .../resolver/ExpressionResolver.java | 25 ++++- .../rules/ResolveCallByArgumentsRule.java | 31 +----- .../resolver/rules/ResolverRule.java | 6 ++ .../utils/OperationTreeBuilder.java | 23 ++++- .../flink/table/functions/TestGenericUDF.java | 8 ++ .../flink/table/functions/TestSimpleUDF.java | 8 ++ .../table/functions/AggregateFunction.java | 8 ++ .../AggregateFunctionDefinition.java | 15 ++- .../table/functions/AsyncTableFunction.java | 8 ++ .../functions/BuiltInFunctionDefinition.java | 35 ++++--- .../table/functions/FunctionDefinition.java | 16 +++ .../flink/table/functions/ScalarFunction.java | 98 ++++++++++++++----- .../functions/ScalarFunctionDefinition.java | 15 ++- .../functions/TableAggregateFunction.java | 8 ++ .../TableAggregateFunctionDefinition.java | 15 ++- .../flink/table/functions/TableFunction.java | 8 ++ .../functions/TableFunctionDefinition.java | 15 ++- .../table/functions/UserDefinedFunction.java | 30 ++++++ .../types/inference/TypeInferenceUtil.java | 8 +- .../inference/InputTypeStrategiesTest.java | 2 +- .../utils/FunctionDefinitionMock.java | 9 ++ .../expressions/CallExpressionResolver.java | 16 +-- .../DeclarativeAggregateFunction.java | 7 ++ .../TypeInferenceOperandChecker.java | 2 +- .../table/planner/utils/ShortcutUtils.java | 19 ++++ .../plan/batch/sql/SetOperatorsTest.xml | 4 +- .../planner/plan/batch/table/CalcTest.xml | 6 +- .../plan/batch/table/CorrelateTest.xml | 24 ++--- .../planner/plan/batch/table/JoinTest.xml | 54 +++++----- .../plan/batch/table/SetOperatorsTest.xml | 4 +- .../CorrelateStringExpressionTest.xml | 32 +++--- .../PushFilterIntoTableSourceScanRuleTest.xml | 44 +-------- .../logical/RewriteIntersectAllRuleTest.xml | 8 +- .../rules/logical/RewriteMinusAllRuleTest.xml | 8 +- .../plan/stream/sql/SetOperatorsTest.xml | 4 +- .../planner/plan/stream/table/CalcTest.xml | 6 +- .../plan/stream/table/ColumnFunctionsTest.xml | 10 +- .../plan/stream/table/CorrelateTest.xml | 44 ++++----- .../plan/stream/table/OverWindowTest.xml | 18 ++-- .../plan/stream/table/TableAggregateTest.xml | 2 +- .../table/api/internal/TableEnvImpl.scala | 1 + 42 files changed, 450 insertions(+), 255 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 18adb10b2ecf9..4233f5542850f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -171,6 +171,7 @@ protected TableEnvironmentImpl( this.operationTreeBuilder = OperationTreeBuilder.create( tableConfig, functionCatalog, + catalogManager.getDataTypeFactory(), path -> { try { UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 78a18a7be1d9d..07fd7c845baa0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.OverWindow; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; @@ -104,6 +105,8 @@ public static List getAllResolverRules() { private final FunctionLookup functionLookup; + private final DataTypeFactory typeFactory; + private final PostResolverFactory postResolverFactory = new PostResolverFactory(); private final Map localReferences; @@ -114,6 +117,7 @@ private ExpressionResolver( TableConfig config, TableReferenceLookup tableLookup, FunctionLookup functionLookup, + DataTypeFactory typeFactory, FieldReferenceLookup fieldLookup, List localOverWindows, List localReferences) { @@ -121,6 +125,7 @@ private ExpressionResolver( this.tableLookup = Preconditions.checkNotNull(tableLookup); this.fieldLookup = Preconditions.checkNotNull(fieldLookup); this.functionLookup = Preconditions.checkNotNull(functionLookup); + this.typeFactory = Preconditions.checkNotNull(typeFactory); this.localReferences = localReferences.stream().collect(Collectors.toMap( LocalReferenceExpression::getName, @@ -136,6 +141,7 @@ private ExpressionResolver( * @param config general configuration * @param tableCatalog a way to lookup a table reference by name * @param functionLookup a way to lookup call by name + * @param typeFactory a way to lookup and create data types * @param inputs inputs to use for field resolution * @return builder for resolver */ @@ -143,8 +149,14 @@ public static ExpressionResolverBuilder resolverFor( TableConfig config, TableReferenceLookup tableCatalog, FunctionLookup functionLookup, + DataTypeFactory typeFactory, QueryOperation... inputs) { - return new ExpressionResolverBuilder(inputs, config, tableCatalog, functionLookup); + return new ExpressionResolverBuilder( + inputs, + config, + tableCatalog, + functionLookup, + typeFactory); } /** @@ -268,6 +280,11 @@ public FunctionLookup functionLookup() { return functionLookup; } + @Override + public DataTypeFactory typeFactory() { + return typeFactory; + } + @Override public PostResolverFactory postResolutionFactory() { return postResolverFactory; @@ -357,6 +374,7 @@ public static class ExpressionResolverBuilder { private final List queryOperations; private final TableReferenceLookup tableCatalog; private final FunctionLookup functionLookup; + private final DataTypeFactory typeFactory; private List logicalOverWindows = new ArrayList<>(); private List localReferences = new ArrayList<>(); @@ -364,11 +382,13 @@ private ExpressionResolverBuilder( QueryOperation[] queryOperations, TableConfig config, TableReferenceLookup tableCatalog, - FunctionLookup functionLookup) { + FunctionLookup functionLookup, + DataTypeFactory typeFactory) { this.config = config; this.queryOperations = Arrays.asList(queryOperations); this.tableCatalog = tableCatalog; this.functionLookup = functionLookup; + this.typeFactory = typeFactory; } public ExpressionResolverBuilder withOverWindows(List windows) { @@ -386,6 +406,7 @@ public ExpressionResolver build() { config, tableCatalog, functionLookup, + typeFactory, new FieldReferenceLookup(queryOperations), logicalOverWindows, localReferences); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index 1bad5ad9bafa1..04b848ce73c03 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -24,7 +24,6 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; @@ -170,8 +169,9 @@ private List flattenCompositeType(ResolvedExpression composi private Optional getOptionalTypeInference(FunctionDefinition definition) { if (definition instanceof BuiltInFunctionDefinition) { final BuiltInFunctionDefinition builtInDefinition = (BuiltInFunctionDefinition) definition; - if (builtInDefinition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) { - return Optional.of(builtInDefinition.getTypeInference()); + final TypeInference inference = builtInDefinition.getTypeInference(resolutionContext.typeFactory()); + if (inference.getOutputTypeStrategy() != TypeStrategies.MISSING) { + return Optional.of(inference); } } return Optional.empty(); @@ -187,7 +187,7 @@ private ResolvedExpression runTypeInference( final Result inferenceResult = TypeInferenceUtil.runTypeInference( inference, new TableApiCallContext( - new UnsupportedDataTypeFactory(), + resolutionContext.typeFactory(), name, unresolvedCall.getFunctionDefinition(), resolvedArgs), @@ -275,29 +275,6 @@ private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definit // -------------------------------------------------------------------------------------------- - private static class UnsupportedDataTypeFactory implements DataTypeFactory { - - @Override - public Optional createDataType(String name) { - throw new TableException("Data type factory is not supported yet."); - } - - @Override - public Optional createDataType(UnresolvedIdentifier identifier) { - throw new TableException("Data type factory is not supported yet."); - } - - @Override - public DataType createDataType(Class clazz) { - throw new TableException("Data type factory is not supported yet."); - } - - @Override - public DataType createRawDataType(Class clazz) { - throw new TableException("Data type factory is not supported yet."); - } - } - private static class TableApiCallContext implements CallContext { private final DataTypeFactory typeFactory; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java index d8525c855f75d..6b47c95ce3488 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.LocalReferenceExpression; @@ -68,6 +69,11 @@ interface ResolutionContext { */ FunctionLookup functionLookup(); + /** + * Access to {@link DataTypeFactory}. + */ + DataTypeFactory typeFactory(); + /** * Enables the creation of resolved expressions for transformations after the actual resolution. */ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java index 05fa8b3b6872c..082a6d804a05d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java @@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionUtils; @@ -88,6 +89,7 @@ public final class OperationTreeBuilder { private final TableConfig config; private final FunctionLookup functionCatalog; + private final DataTypeFactory typeFactory; private final TableReferenceLookup tableReferenceLookup; private final LookupCallResolver lookupResolver; @@ -104,6 +106,7 @@ public final class OperationTreeBuilder { private OperationTreeBuilder( TableConfig config, FunctionLookup functionLookup, + DataTypeFactory typeFactory, TableReferenceLookup tableReferenceLookup, ProjectionOperationFactory projectionOperationFactory, SortOperationFactory sortOperationFactory, @@ -113,6 +116,7 @@ private OperationTreeBuilder( JoinOperationFactory joinOperationFactory) { this.config = config; this.functionCatalog = functionLookup; + this.typeFactory = typeFactory; this.tableReferenceLookup = tableReferenceLookup; this.projectionOperationFactory = projectionOperationFactory; this.sortOperationFactory = sortOperationFactory; @@ -126,11 +130,13 @@ private OperationTreeBuilder( public static OperationTreeBuilder create( TableConfig config, FunctionLookup functionCatalog, + DataTypeFactory typeFactory, TableReferenceLookup tableReferenceLookup, boolean isStreamingMode) { return new OperationTreeBuilder( config, functionCatalog, + typeFactory, tableReferenceLookup, new ProjectionOperationFactory(), new SortOperationFactory(isStreamingMode), @@ -178,6 +184,7 @@ private QueryOperation projectInternal( config, tableReferenceLookup, functionCatalog, + typeFactory, child) .withOverWindows(overWindows) .build(); @@ -246,6 +253,7 @@ public QueryOperation windowAggregate( config, tableReferenceLookup, functionCatalog, + typeFactory, child) .withLocalReferences( new LocalReferenceExpression( @@ -296,6 +304,7 @@ public QueryOperation windowAggregate( config, tableReferenceLookup, functionCatalog, + typeFactory, child) .withLocalReferences( new LocalReferenceExpression( @@ -343,6 +352,7 @@ public QueryOperation join( config, tableReferenceLookup, functionCatalog, + typeFactory, left, right) .build(); @@ -376,6 +386,7 @@ public Expression resolveExpression(Expression expression, QueryOperation... tab config, tableReferenceLookup, functionCatalog, + typeFactory, tableOperation).build(); return resolveSingleExpression(expression, resolver); @@ -545,11 +556,11 @@ private AggregateWithAlias(UnresolvedCallExpression aggregate, List alia private static class ExtractAliasAndAggregate extends ApiExpressionDefaultVisitor { // need this flag to validate alias, i.e., the length of alias and function result type should be same. - private boolean isRowbasedAggregate = false; - private ExpressionResolver resolver = null; + private boolean isRowBasedAggregate; + private ExpressionResolver resolver; - public ExtractAliasAndAggregate(boolean isRowbasedAggregate, ExpressionResolver resolver) { - this.isRowbasedAggregate = isRowbasedAggregate; + public ExtractAliasAndAggregate(boolean isRowBasedAggregate, ExpressionResolver resolver) { + this.isRowBasedAggregate = isRowBasedAggregate; this.resolver = resolver; } @@ -596,7 +607,7 @@ private Optional getAggregate( } else { ResolvedExpression resolvedExpression = resolver.resolve(Collections.singletonList(unresolvedCall)).get(0); - validateAlias(aliases, resolvedExpression, isRowbasedAggregate); + validateAlias(aliases, resolvedExpression, isRowBasedAggregate); fieldNames = aliases; } return Optional.of(new AggregateWithAlias(unresolvedCall, fieldNames)); @@ -681,6 +692,7 @@ public QueryOperation windowTableAggregate( config, tableReferenceLookup, functionCatalog, + typeFactory, child) .withLocalReferences( new LocalReferenceExpression( @@ -781,6 +793,7 @@ private ExpressionResolver getResolver(QueryOperation child) { config, tableReferenceLookup, functionCatalog, + typeFactory, child) .build(); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestGenericUDF.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestGenericUDF.java index 1821245c8d0a7..5519e3062ae20 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestGenericUDF.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestGenericUDF.java @@ -18,6 +18,9 @@ package org.apache.flink.table.functions; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; + import java.util.Set; /** @@ -30,6 +33,11 @@ public FunctionKind getKind() { return FunctionKind.SCALAR; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new UnsupportedOperationException(); + } + @Override public Set getRequirements() { return null; diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestSimpleUDF.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestSimpleUDF.java index 13bb017f85c06..4f4c4eeac3df5 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestSimpleUDF.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/TestSimpleUDF.java @@ -18,6 +18,9 @@ package org.apache.flink.table.functions; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; + import java.util.Set; /** @@ -30,6 +33,11 @@ public FunctionKind getKind() { return FunctionKind.SCALAR; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new UnsupportedOperationException(); + } + @Override public Set getRequirements() { return null; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java index 94b96f3417f5b..6d0f30a357f63 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java @@ -19,6 +19,9 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import java.util.Collections; import java.util.HashSet; @@ -144,6 +147,11 @@ public final FunctionKind getKind() { return FunctionKind.AGGREGATE; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Aggregate functions are not updated to the new type system yet."); + } + @Override public Set getRequirements() { final HashSet requirements = new HashSet<>(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java index df01db0d1e912..3cbcec5c3bde9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java @@ -18,19 +18,23 @@ package org.apache.flink.table.functions; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Preconditions; import java.util.Objects; import java.util.Set; /** - * The function definition of an user-defined aggregate function. + * A "marker" function definition of an user-defined aggregate function that uses the old type system + * stack. * *

This class can be dropped once we introduce a new type inference. */ -@PublicEvolving +@Internal public final class AggregateFunctionDefinition implements FunctionDefinition { private final String name; @@ -70,6 +74,11 @@ public FunctionKind getKind() { return FunctionKind.AGGREGATE; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Functions implemented for the old type system are not supported."); + } + @Override public Set getRequirements() { return aggregateFunction.getRequirements(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java index 3a8c879e209e7..b8e6707e8e196 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java @@ -20,6 +20,9 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import java.util.concurrent.CompletableFuture; @@ -106,4 +109,9 @@ public TypeInformation getResultType() { public final FunctionKind getKind() { return FunctionKind.ASYNC_TABLE; } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Async table functions are not updated to the new type system yet."); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index 8fdb280c0c363..a4ad6f4950c9c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -19,13 +19,14 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeStrategy; import org.apache.flink.util.Preconditions; -import java.util.List; +import java.util.Arrays; /** * Definition of a built-in function. It enables unique identification across different @@ -54,16 +55,15 @@ private BuiltInFunctionDefinition( this.typeInference = Preconditions.checkNotNull(typeInference, "Type inference must not be null."); } - public String getName() { - return name; - } - /** - * Currently, the type inference is just exposed here. In the future, function definition will - * require it. + * Builder for configuring and creating instances of {@link BuiltInFunctionDefinition}. */ - public TypeInference getTypeInference() { - return typeInference; + public static BuiltInFunctionDefinition.Builder newBuilder() { + return new BuiltInFunctionDefinition.Builder(); + } + + public String getName() { + return name; } @Override @@ -71,6 +71,11 @@ public FunctionKind getKind() { return kind; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return typeInference; + } + @Override public String toString() { return name; @@ -81,13 +86,13 @@ public String toString() { /** * Builder for fluent definition of built-in functions. */ - public static class Builder { + public static final class Builder { private String name; private FunctionKind kind; - private TypeInference.Builder typeInferenceBuilder = new TypeInference.Builder(); + private TypeInference.Builder typeInferenceBuilder = TypeInference.newBuilder(); public Builder() { // default constructor to allow a fluent definition @@ -103,13 +108,13 @@ public Builder kind(FunctionKind kind) { return this; } - public Builder namedArguments(List argumentNames) { - this.typeInferenceBuilder.namedArguments(argumentNames); + public Builder namedArguments(String... argumentNames) { + this.typeInferenceBuilder.namedArguments(Arrays.asList(argumentNames)); return this; } - public Builder typedArguments(List argumentTypes) { - this.typeInferenceBuilder.typedArguments(argumentTypes); + public Builder typedArguments(DataType... argumentTypes) { + this.typeInferenceBuilder.typedArguments(Arrays.asList(argumentTypes)); return this; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java index 16379285f2e28..44f1c9da86ab6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionDefinition.java @@ -19,6 +19,8 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import java.util.Collections; import java.util.Set; @@ -40,6 +42,20 @@ public interface FunctionDefinition { */ FunctionKind getKind(); + /** + * Returns the logic for performing type inference of a call to this function definition. + * + *

The type inference process is responsible for inferring unknown types of input arguments, + * validating input arguments, and producing result types. The type inference process happens + * independent of a function body. The output of the type inference is used to search for a + * corresponding runtime implementation. + * + *

Instances of type inference can be created by using {@link TypeInference#newBuilder()}. + * + *

See {@link BuiltInFunctionDefinitions} for concrete usage examples. + */ + TypeInference getTypeInference(DataTypeFactory typeFactory); + /** * Returns the set of requirements this definition demands. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java index cfb4d143f2e02..59a351e60fadc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java @@ -22,7 +22,12 @@ import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.extraction.TypeInferenceExtractor; +import org.apache.flink.table.types.inference.TypeInference; /** * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one, @@ -32,18 +37,57 @@ * method. An evaluation method must be declared publicly and named eval. Evaluation * methods can also be overloaded by implementing multiple methods named eval. * - *

User-defined functions must have a default constructor and must be instantiable during runtime. + *

By default, input and output data types are automatically extracted using reflection. If the + * reflective information is not sufficient, it can be supported and enriched with {@link DataTypeHint} + * and {@link FunctionHint} annotations. + * + *

The following examples show how to specify a scalar function: + * + *

+ * {@code
+ *   // a function that accepts two INT arguments and computes a sum
+ *   class SumFunction extends ScalarFunction {
+ *     public int eval(int a, int b) {
+ *       return a + b;
+ *     }
+ *   }
+ *
+ *   // a function that accepts either INT or BOOLEAN and computes a STRING
+ *   class StringifyFunction extends ScalarFunction {
+ *     public String eval(int i) {
+ *       return String.valueOf(i);
+ *     }
+ *     public String eval(boolean b) {
+ *       return String.valueOf(b);
+ *     }
+ *   }
+ *
+ *   // a function that accepts either INT or BOOLEAN and computes a STRING using function hints
+ *   @FunctionHint(input = [@DataTypeHint("INT")])
+ *   @FunctionHint(input = [@DataTypeHint("BOOLEAN")])
+ *   class StringifyFunction extends ScalarFunction {
+ *     public String eval(Object o) {
+ *       return o.toString();
+ *     }
+ *   }
+ *
+ *   // a function that accepts any data type as argument and computes a STRING
+ *   class StringifyFunction extends ScalarFunction {
+ *     public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ *       return o.toString();
+ *     }
+ *   }
  *
- * 

By default the result type of an evaluation method is determined by Flink's type extraction - * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more - * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type - * can be manually defined by overriding {@link ScalarFunction#getResultType}. + * // a function that accepts an arbitrary number of BIGINT values and computes a DECIMAL(10, 4) + * class SumFunction extends ScalarFunction { + * public @DataTypeHint("DECIMAL(10, 4)") BigDecimal eval(long... values) { + * // ... + * } + * } + * } + *

* - *

Internally, the Table/SQL API code generation works with primitive values as much as possible. - * If a user-defined scalar function should not introduce much overhead during runtime, it is - * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal - * to long. + *

User-defined functions must have a default constructor and must be instantiable during runtime. */ @PublicEvolving public abstract class ScalarFunction extends UserDefinedFunction { @@ -51,15 +95,14 @@ public abstract class ScalarFunction extends UserDefinedFunction { /** * Returns the result type of the evaluation method with a given signature. * - *

This method needs to be overridden in case Flink's type extraction facilities are not - * sufficient to extract the {@link TypeInformation} based on the return type of the evaluation - * method. Flink's type extraction facilities can handle basic types or - * simple POJOs but might be wrong for more complex, custom, or composite types. - * - * @param signature signature of the method the return type needs to be determined - * @return {@link TypeInformation} of result type or null if Flink should - * determine the type + * @deprecated This method uses the old type system and is based on the old reflective extraction + * logic. The method will be removed in future versions and is only called when using + * the deprecated {@code TableEnvironment.registerFunction(...)} method. The new reflective + * extraction logic (possibly enriched with {@link DataTypeHint} and {@link FunctionHint}) + * should be powerful enough to cover most use cases. For advanced users, it is possible + * to override {@link UserDefinedFunction#getTypeInference(DataTypeFactory)}. */ + @Deprecated public TypeInformation getResultType(Class[] signature) { return null; } @@ -68,14 +111,14 @@ public TypeInformation getResultType(Class[] signature) { * Returns {@link TypeInformation} about the operands of the evaluation method with a given * signature. * - *

In order to perform operand type inference in SQL (especially when NULL is - * used) it might be necessary to determine the parameter {@link TypeInformation} of an - * evaluation method. By default Flink's type extraction facilities are used for this but might - * be wrong for more complex, custom, or composite types. - * - * @param signature signature of the method the operand types need to be determined - * @return {@link TypeInformation} of operand types + * @deprecated This method uses the old type system and is based on the old reflective extraction + * logic. The method will be removed in future versions and is only called when using + * the deprecated {@code TableEnvironment.registerFunction(...)} method. The new reflective + * extraction logic (possibly enriched with {@link DataTypeHint} and {@link FunctionHint}) + * should be powerful enough to cover most use cases. For advanced users, it is possible + * to override {@link UserDefinedFunction#getTypeInference(DataTypeFactory)}. */ + @Deprecated public TypeInformation[] getParameterTypes(Class[] signature) { final TypeInformation[] types = new TypeInformation[signature.length]; for (int i = 0; i < signature.length; i++) { @@ -94,4 +137,9 @@ public TypeInformation[] getParameterTypes(Class[] signature) { public final FunctionKind getKind() { return FunctionKind.SCALAR; } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return TypeInferenceExtractor.forScalarFunction(typeFactory, getClass()); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java index 279d436fcc80d..9744dcbcaf27e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java @@ -18,18 +18,22 @@ package org.apache.flink.table.functions; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Preconditions; import java.util.Objects; import java.util.Set; /** - * The function definition of an user-defined scalar function. + * A "marker" function definition of a user-defined scalar function that uses the old type system + * stack. * *

This class can be dropped once we introduce a new type inference. */ -@PublicEvolving +@Internal public final class ScalarFunctionDefinition implements FunctionDefinition { private final String name; @@ -53,6 +57,11 @@ public FunctionKind getKind() { return FunctionKind.SCALAR; } + @Override + public TypeInference getTypeInference(DataTypeFactory factory) { + throw new TableException("Functions implemented for the old type system are not supported."); + } + @Override public Set getRequirements() { return scalarFunction.getRequirements(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java index 9b39efbe33b68..c8ee292989852 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java @@ -19,6 +19,9 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Collector; /** @@ -129,4 +132,9 @@ public interface RetractableCollector extends Collector { public final FunctionKind getKind() { return FunctionKind.TABLE_AGGREGATE; } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Table aggregate functions are not updated to the new type system yet."); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java index d5c4f1c685074..dd4eb18e58d94 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java @@ -18,19 +18,23 @@ package org.apache.flink.table.functions; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Preconditions; import java.util.Objects; import java.util.Set; /** - * The function definition of an user-defined table aggregate function. + * A "marker" function definition of an user-defined table aggregate function that uses the old type + * system stack. * *

This class can be dropped once we introduce a new type inference. */ -@PublicEvolving +@Internal public final class TableAggregateFunctionDefinition implements FunctionDefinition { private final String name; @@ -70,6 +74,11 @@ public FunctionKind getKind() { return FunctionKind.TABLE_AGGREGATE; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Functions implemented for the old type system are not supported."); + } + @Override public Set getRequirements() { return aggregateFunction.getRequirements(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java index 562ee6390cf0a..059ea7ab63013 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java @@ -22,7 +22,10 @@ import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Collector; /** @@ -149,4 +152,9 @@ protected final void collect(T row) { public final FunctionKind getKind() { return FunctionKind.TABLE; } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Table functions are not updated to the new type system yet."); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java index 8a6392f0b7280..42607f7542584 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java @@ -18,19 +18,23 @@ package org.apache.flink.table.functions; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Preconditions; import java.util.Objects; import java.util.Set; /** - * The function definition of an user-defined table function. + * A "marker" function definition of an user-defined table function that uses the old type system + * stack. * *

This class can be dropped once we introduce a new type inference. */ -@PublicEvolving +@Internal public final class TableFunctionDefinition implements FunctionDefinition { private final String name; @@ -63,6 +67,11 @@ public FunctionKind getKind() { return FunctionKind.TABLE; } + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + throw new TableException("Functions implemented for the old type system are not supported."); + } + @Override public Set getRequirements() { return tableFunction.getRequirements(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java index a4c3261eeede7..48be28dd00d36 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java @@ -19,6 +19,11 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.utils.EncodingUtils; import java.io.Serializable; @@ -62,6 +67,31 @@ public void close() throws Exception { // do nothing } + /** + * {@inheritDoc} + * + *

The type inference for user-defined functions is automatically extracted using reflection. It + * does this by analyzing implementation methods such as {@code eval() or accumulate()} and the generic + * parameters of a function class if present. If the reflective information is not sufficient, it can + * be supported and enriched with {@link DataTypeHint} and {@link FunctionHint} annotations. + * + *

Note: Overriding this method is only recommended for advanced users. If a custom type inference + * is specified, it is the responsibility of the implementer to make sure that the output of the type + * inference process matches with the implementation method: + * + *

The implementation method must comply with each {@link DataType#getConversionClass()} returned + * by the type inference. For example, if {@code DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)} + * is an expected argument type, the method must accept a call {@code eval(java.sql.Timestamp)}. + * + *

Regular Java calling semantics (including type widening and autoboxing) are applied when calling + * an implementation method which means that the signature can be {@code eval(java.lang.Object)}. + * + *

The runtime will take care of converting the data to the data format specified by the + * {@link DataType#getConversionClass()} coming from the type inference logic. + */ + @Override + public abstract TypeInference getTypeInference(DataTypeFactory typeFactory); + /** * Returns the name of the UDF that is used for plan explanation and logging. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java index 5c6cf6fc2112f..43aaa7f2a9895 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java @@ -157,9 +157,9 @@ public static DataType inferOutputType(CallContext callContext, TypeStrategy out * Generates a signature of the given {@link FunctionDefinition}. */ public static String generateSignature( + TypeInference typeInference, String name, - FunctionDefinition definition, - TypeInference typeInference) { + FunctionDefinition definition) { if (typeInference.getNamedArguments().isPresent() || typeInference.getTypedArguments().isPresent()) { return formatNamedOrTypedArguments(name, typeInference); } @@ -180,9 +180,9 @@ public static ValidationException createInvalidInputException( String.format( "Invalid input arguments. Expected signatures are:\n%s", generateSignature( + typeInference, callContext.getName(), - callContext.getFunctionDefinition(), - typeInference) + callContext.getFunctionDefinition()) ), cause); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java index 19adce66ded59..5126f3ee4ab37 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java @@ -413,7 +413,7 @@ public void testStrategy() { private String generateSignature() { final FunctionDefinitionMock functionDefinitionMock = new FunctionDefinitionMock(); functionDefinitionMock.functionKind = FunctionKind.SCALAR; - return TypeInferenceUtil.generateSignature("f", functionDefinitionMock, createTypeInference()); + return TypeInferenceUtil.generateSignature(createTypeInference(), "f", functionDefinitionMock); } private TypeInferenceUtil.Result runTypeInference(List actualArgumentTypes) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/FunctionDefinitionMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/FunctionDefinitionMock.java index 2f87d5cb2f378..49a644fe3be53 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/FunctionDefinitionMock.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/FunctionDefinitionMock.java @@ -18,8 +18,10 @@ package org.apache.flink.table.types.inference.utils; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.types.inference.TypeInference; /** * {@link FunctionDefinition} mock for testing purposes. @@ -28,8 +30,15 @@ public class FunctionDefinitionMock implements FunctionDefinition { public FunctionKind functionKind; + public TypeInference typeInference; + @Override public FunctionKind getKind() { return functionKind; } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return typeInference; + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java index 61cc34764b428..43b5b65560392 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Optional; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext; + /** * Planner expression resolver for {@link UnresolvedCallExpression}. */ @@ -39,15 +41,13 @@ public class CallExpressionResolver { private final ExpressionResolver resolver; public CallExpressionResolver(RelBuilder relBuilder) { - // dummy way to get context - FlinkContext context = relBuilder - .values(new String[]{"dummyField"}, "dummyValue") - .build() - .getCluster().getPlanner().getContext().unwrap(FlinkContext.class); + FlinkContext context = unwrapContext(relBuilder.getCluster()); this.resolver = ExpressionResolver.resolverFor( - context.getTableConfig(), - name -> Optional.empty(), - context.getFunctionCatalog()).build(); + context.getTableConfig(), + name -> Optional.empty(), + context.getFunctionCatalog(), + context.getCatalogManager().getDataTypeFactory()) + .build(); } public ResolvedExpression resolve(Expression expression) { diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java index c58aec8856816..f6aeccd764ea3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java @@ -18,11 +18,13 @@ package org.apache.flink.table.planner.functions.aggfunctions; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.UnresolvedReferenceExpression; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.util.Preconditions; import java.util.Arrays; @@ -172,4 +174,9 @@ public final UnresolvedReferenceExpression[] mergeOperands() { public final FunctionKind getKind() { return FunctionKind.OTHER; } + + @Override + public TypeInference getTypeInference(DataTypeFactory factory) { + throw new UnsupportedOperationException(); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java index 09596958a47eb..330f3c4dbd647 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java @@ -103,7 +103,7 @@ public SqlOperandCountRange getOperandCountRange() { @Override public String getAllowedSignatures(SqlOperator op, String opName) { - return TypeInferenceUtil.generateSignature(opName, definition, typeInference); + return TypeInferenceUtil.generateSignature(typeInference, opName, definition); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java index 36f06b349fed5..45680c958d681 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java @@ -19,9 +19,12 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.SqlOperatorBinding; @@ -49,6 +52,22 @@ public static FlinkTypeFactory unwrapTypeFactory(RelDataTypeFactory typeFactory) return (FlinkTypeFactory) typeFactory; } + public static FlinkContext unwrapContext(RelNode relNode) { + return unwrapContext(relNode.getCluster()); + } + + public static FlinkContext unwrapContext(RelOptCluster cluster) { + return unwrapContext(cluster.getPlanner()); + } + + public static FlinkContext unwrapContext(RelOptPlanner planner) { + return unwrapContext(planner.getContext()); + } + + public static FlinkContext unwrapContext(Context context) { + return context.unwrap(FlinkContext.class); + } + private ShortcutUtils() { // no instantiation } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml index 76c042cba05a9..9ae82a8fae944 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml @@ -58,7 +58,7 @@ LogicalIntersect(all=[true]) (vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))]) +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt]) +- Exchange(distribution=[hash[c]]) @@ -181,7 +181,7 @@ LogicalMinus(all=[true]) (sum_vcol_marker, 0)]) +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[c]]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml index 8adaeb28a1c6b..cf5ede6287db4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CalcTest.xml @@ -19,7 +19,7 @@ limitations under the License. @@ -169,7 +169,7 @@ Calc(select=[a]) @@ -221,7 +221,7 @@ Calc(select=[a, b]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml index f50de9c89e23c..25b36eda673f4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml @@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9ac48a5444b14d587749b42aef392f1d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 20)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -47,13 +47,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9ac48a5444b14d587749b42aef392f1d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 20)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -64,13 +64,13 @@ Calc(select=[c, d]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -81,13 +81,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -99,13 +99,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')]) +- LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> (s, _UTF-16LE'')]) -+- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -116,13 +116,13 @@ Calc(select=[c, s], where=[>(s, _UTF-16LE'')]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml index e8e9c05285468..dcb6af56f9e92 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml @@ -20,7 +20,7 @@ limitations under the License. =($0, 0)]) -+- LogicalProject(c1=[org$apache$flink$table$planner$plan$batch$table$JoinTest$Merger$$ad6edb4d4c8a8ac04216f9aeaab1e36f($2, org$apache$flink$table$planner$plan$batch$table$JoinTest$Merger$$ad6edb4d4c8a8ac04216f9aeaab1e36f($2, $5))]) ++- LogicalProject(c1=[org$apache$flink$table$planner$plan$batch$table$JoinTest$Merger$$234a0810cc9eb576e09d551c1fe0de50($2, org$apache$flink$table$planner$plan$batch$table$JoinTest$Merger$$234a0810cc9eb576e09d551c1fe0de50($2, $5))]) +- LogicalJoin(condition=[=($1, $4)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, Table2, source: [TestTableSource(d, e, f)]]]) @@ -212,55 +212,54 @@ Calc(select=[c, g]) ]]> - + - + - + @@ -268,31 +267,32 @@ LogicalProject(b=[$1], y=[$4]) - - - + + - - - + + - - + + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml index dac07de4b1512..a4d2c7b129dfc 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml @@ -34,7 +34,7 @@ Calc(select=[EXPR$0 AS a, b, EXPR$1 AS c]) +- Exchange(distribution=[hash[b]]) +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) AS sum$0, Partial_COUNT(c) AS count$1]) +- Calc(select=[a0 AS a, b0 AS b, c0 AS c]) - +- Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$6c050a89d06b92467327f90624300f38($0, $1, $2, $3)], correlate=[table(ReplicateRows(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER]) + +- Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$c170efff0e35705d2a5ea1420b200ef8($0, $1, $2, $3)], correlate=[table(ReplicateRows(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, a, b, c], where=[>(sum_vcol_marker, 0)]) +- HashAggregate(isMerge=[true], groupBy=[a, b, c], select=[a, b, c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[a, b, c]]) @@ -124,7 +124,7 @@ LogicalProject(b=[$1], c=[$2]) (sum_vcol_marker, 0)]) +- HashAggregate(isMerge=[true], groupBy=[b, c], select=[b, c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[b, c]]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml index 66b48aff88b7c..5dbc41dd4474a 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml @@ -22,13 +22,13 @@ limitations under the License. LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -39,13 +39,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -56,13 +56,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -73,13 +73,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$17bf4cfa4edf33541607b8bd603955ac($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -90,13 +90,13 @@ Calc(select=[c, name, len]) LogicalProject(c=[$2], name=[$3], len=[$5], adult=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$HierarchyTableFunction$f774f4099ee1f822db4843f4baef20d3($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$HierarchyTableFunction$d410fa6e6e30ed78d268f393aa448fc0($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -107,13 +107,13 @@ Calc(select=[c, name, len, adult]) LogicalProject(c=[$2], name=[$4], age=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$PojoTableFunc$ed1520cba05608cdb804b2bd34004cea($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$PojoTableFunc$caef025549fa8b78c6c43c72b9eecf8a($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -125,13 +125,13 @@ LogicalFilter(condition=[>($2, 2)]) +- LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$17bf4cfa4edf33541607b8bd603955ac($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 2)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -142,13 +142,13 @@ Calc(select=[c, name, len]) LogicalProject(a=[$0], c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml index 13693a851816b..6c58346516dbf 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml @@ -19,7 +19,6 @@ limitations under the License. 10]]> - ($3, 10)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - ($3, 10)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - 10]]> - - ($3, 10)]) +- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - 2 OR amount < 10]]> - ($2, 2), <($2, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - ($2, 2), <($2, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - 2 AND amount < 10]]> - - - 2 OR amount < 10]]> - - ($2, 2), <($2, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - 2]]> - ($2, 2)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - - 2]]> - - - 2 AND amount < 10]]> - ($2, 2), <($2, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - - 2 AND price > 10]]> - ($2, 2), >($3, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - ($3, 10)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[true], filter=[greaterThan(amount, 2)]]]]) ]]> - 2 OR price > 10]]> - ($2, 2), >($3, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - ($2, 2), >($3, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - @@ -246,7 +216,6 @@ LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3]) SELECT * FROM MyTable WHERE amount > 2 AND id < 100 AND CAST(amount AS BIGINT) > 10 ]]> - ($2, 2), <($1, 100), >(CAST($2):BIGINT, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - (CAST($2):BIGINT, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[true], filter=[greaterThan(amount, 2)]]]]) ]]> - 2 OR price > 10]]> - - ($2, 2), >($3, 10))]) +- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable, source: [filterPushedDown=[true], filter=[]]]]) ]]> - 2 AND myUdf(amount) < 32]]> - ($2, 2), <(myUdf($2), 32))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filterPushedDown=[false], filter=[]]]]) ]]> - - 2 AND price > 10]]> - - ($3, 10)]) +- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable, source: [filterPushedDown=[true], filter=[greaterThan(amount, 2)]]]]) ]]> - diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml index d22b31dc87d83..3e16ba7d9a573 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml @@ -43,7 +43,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -76,7 +76,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -111,7 +111,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(d=[$0], e=[$1], f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$6c050a89d06b92467327f90624300f38($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$c170efff0e35705d2a5ea1420b200ef8($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -144,7 +144,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$2]) : +- LogicalFilter(condition=[=(1, 0)]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml index 2f1af858c018f..9a59ea0d4d0ec 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml @@ -43,7 +43,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -76,7 +76,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -109,7 +109,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$2]) : +- LogicalFilter(condition=[=(1, 0)]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$d2d31446db8fc621d87786316146cb4d($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$4b48a61becef029b0dc0cd7eef2228cb($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -144,7 +144,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_marker=[-1:BIGINT]) : +- LogicalProject(d=[$0], e=[$1], f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$6c050a89d06b92467327f90624300f38($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$c170efff0e35705d2a5ea1420b200ef8($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], elementType=[class [Ljava.lang.Object;]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml index 0bcc1c6b93b1b..d4cc206fa4886 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml @@ -59,7 +59,7 @@ LogicalIntersect(all=[true]) (vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))]) +- GroupAggregate(groupBy=[c], select=[c, COUNT(vcol_left_marker) AS vcol_left_cnt, COUNT(vcol_right_marker) AS vcol_right_cnt]) +- Exchange(distribution=[hash[c]]) @@ -183,7 +183,7 @@ LogicalMinus(all=[true]) (sum_vcol_marker, 0)]) +- GroupAggregate(groupBy=[c], select=[c, SUM(vcol_marker) AS sum_vcol_marker]) +- Exchange(distribution=[hash[c]]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml index 2a44ce2e15395..a32177e9e53c9 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml @@ -62,7 +62,7 @@ Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(2147483647) CHARACTER SET "UTF-16L @@ -93,7 +93,7 @@ Calc(select=[a, b], where=[AND(>(a, 0), <(b, 2), =(MOD(a, 2), 1))]) @@ -122,7 +122,7 @@ Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml index ec37bc43f52ae..bf85e1b4e355b 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml @@ -19,7 +19,7 @@ limitations under the License. @@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[=(int1, int2)], select=[int1, long1, string1, @@ -166,7 +166,7 @@ Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9ac48a5444 @@ -196,7 +196,7 @@ Calc(select=[a AS d, b]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml index 5dde28f1c28f9..5dd50189ad8fd 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml @@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9ac48a5444b14d587749b42aef392f1d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 20)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -47,13 +47,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9ac48a5444b14d587749b42aef392f1d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 20)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc0$9c0e1636b85ac954c6f6eac143be7b3d($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -64,13 +64,13 @@ Calc(select=[c, d]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -81,13 +81,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -98,13 +98,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$17bf4cfa4edf33541607b8bd603955ac(org$apache$flink$table$planner$expressions$utils$Func13$448f77ca0acfcec5a03bbafbbaeae74b($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69(org$apache$flink$table$planner$expressions$utils$Func13$aceadf1af6c698a4705a8fbd3984d0a3($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -116,13 +116,13 @@ LogicalFilter(condition=[>($2, 2)]) +- LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$17bf4cfa4edf33541607b8bd603955ac($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ]]> ($1, 2)]) ++- Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -132,12 +132,12 @@ Calc(select=[c, name, len]) @@ -148,13 +148,13 @@ Correlate(invocation=[org$apache$flink$table$planner$utils$HierarchyTableFunctio LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$17bf4cfa4edf33541607b8bd603955ac($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e0361b1a53c7c0ae82ed4d28e9919e69($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -165,13 +165,13 @@ Calc(select=[f0, f1_0 AS f1]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$4f0c749afc756b8e787eefbe846d0b73($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -181,12 +181,12 @@ Calc(select=[c, s]) @@ -196,12 +196,12 @@ Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803 diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml index 5f5a999b79618..84b62c761239f 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml @@ -53,7 +53,7 @@ Calc(select=[c, w0$o0 AS _c1]) @@ -70,7 +70,7 @@ Calc(select=[a, w0$o0 AS myAvg]) @@ -121,7 +121,7 @@ Calc(select=[c, w0$o0 AS _c1]) @@ -138,7 +138,7 @@ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3]) @@ -172,7 +172,7 @@ Calc(select=[a, w0$o0 AS _c1]) @@ -206,7 +206,7 @@ Calc(select=[c, w0$o0 AS _c1]) @@ -257,7 +257,7 @@ Calc(select=[c, w0$o0 AS _c1]) @@ -274,7 +274,7 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS wAvg]) @@ -291,7 +291,7 @@ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS wAvg]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml index 4792ed2903eee..5cbc8cf92d2f5 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableAggregateTest.xml @@ -87,7 +87,7 @@ GroupTableAggregate(groupBy=[f0], select=[f0, EmptyTableAggFuncWithIntResultType diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index d52448638069a..ba9de94d7a2e5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -92,6 +92,7 @@ abstract class TableEnvImpl( private[flink] val operationTreeBuilder = OperationTreeBuilder.create( config, functionCatalog, + catalogManager.getDataTypeFactory, tableLookup, isStreamingMode)