Skip to content

Commit

Permalink
[FLINK-15487][table-common] Make TypeInference mandatory for function…
Browse files Browse the repository at this point in the history
… definitions

This makes TypeInference mandatory for function definitions and updates
FunctionDefinition, UserDefinedFunction, and ScalarFunction to FLIP-65.

This closes apache#10928.
  • Loading branch information
twalthr committed Jan 23, 2020
1 parent 338ae70 commit 2e62ae7
Show file tree
Hide file tree
Showing 42 changed files with 450 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected TableEnvironmentImpl(
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog,
catalogManager.getDataTypeFactory(),
path -> {
try {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.api.OverWindow;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
Expand Down Expand Up @@ -104,6 +105,8 @@ public static List<ResolverRule> getAllResolverRules() {

private final FunctionLookup functionLookup;

private final DataTypeFactory typeFactory;

private final PostResolverFactory postResolverFactory = new PostResolverFactory();

private final Map<String, LocalReferenceExpression> localReferences;
Expand All @@ -114,13 +117,15 @@ private ExpressionResolver(
TableConfig config,
TableReferenceLookup tableLookup,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
FieldReferenceLookup fieldLookup,
List<OverWindow> localOverWindows,
List<LocalReferenceExpression> localReferences) {
this.config = Preconditions.checkNotNull(config);
this.tableLookup = Preconditions.checkNotNull(tableLookup);
this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
this.functionLookup = Preconditions.checkNotNull(functionLookup);
this.typeFactory = Preconditions.checkNotNull(typeFactory);

this.localReferences = localReferences.stream().collect(Collectors.toMap(
LocalReferenceExpression::getName,
Expand All @@ -136,15 +141,22 @@ private ExpressionResolver(
* @param config general configuration
* @param tableCatalog a way to lookup a table reference by name
* @param functionLookup a way to lookup call by name
* @param typeFactory a way to lookup and create data types
* @param inputs inputs to use for field resolution
* @return builder for resolver
*/
public static ExpressionResolverBuilder resolverFor(
TableConfig config,
TableReferenceLookup tableCatalog,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
QueryOperation... inputs) {
return new ExpressionResolverBuilder(inputs, config, tableCatalog, functionLookup);
return new ExpressionResolverBuilder(
inputs,
config,
tableCatalog,
functionLookup,
typeFactory);
}

/**
Expand Down Expand Up @@ -268,6 +280,11 @@ public FunctionLookup functionLookup() {
return functionLookup;
}

@Override
public DataTypeFactory typeFactory() {
return typeFactory;
}

@Override
public PostResolverFactory postResolutionFactory() {
return postResolverFactory;
Expand Down Expand Up @@ -357,18 +374,21 @@ public static class ExpressionResolverBuilder {
private final List<QueryOperation> queryOperations;
private final TableReferenceLookup tableCatalog;
private final FunctionLookup functionLookup;
private final DataTypeFactory typeFactory;
private List<OverWindow> logicalOverWindows = new ArrayList<>();
private List<LocalReferenceExpression> localReferences = new ArrayList<>();

private ExpressionResolverBuilder(
QueryOperation[] queryOperations,
TableConfig config,
TableReferenceLookup tableCatalog,
FunctionLookup functionLookup) {
FunctionLookup functionLookup,
DataTypeFactory typeFactory) {
this.config = config;
this.queryOperations = Arrays.asList(queryOperations);
this.tableCatalog = tableCatalog;
this.functionLookup = functionLookup;
this.typeFactory = typeFactory;
}

public ExpressionResolverBuilder withOverWindows(List<OverWindow> windows) {
Expand All @@ -386,6 +406,7 @@ public ExpressionResolver build() {
config,
tableCatalog,
functionLookup,
typeFactory,
new FieldReferenceLookup(queryOperations),
logicalOverWindows,
localReferences);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
Expand Down Expand Up @@ -170,8 +169,9 @@ private List<ResolvedExpression> flattenCompositeType(ResolvedExpression composi
private Optional<TypeInference> getOptionalTypeInference(FunctionDefinition definition) {
if (definition instanceof BuiltInFunctionDefinition) {
final BuiltInFunctionDefinition builtInDefinition = (BuiltInFunctionDefinition) definition;
if (builtInDefinition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) {
return Optional.of(builtInDefinition.getTypeInference());
final TypeInference inference = builtInDefinition.getTypeInference(resolutionContext.typeFactory());
if (inference.getOutputTypeStrategy() != TypeStrategies.MISSING) {
return Optional.of(inference);
}
}
return Optional.empty();
Expand All @@ -187,7 +187,7 @@ private ResolvedExpression runTypeInference(
final Result inferenceResult = TypeInferenceUtil.runTypeInference(
inference,
new TableApiCallContext(
new UnsupportedDataTypeFactory(),
resolutionContext.typeFactory(),
name,
unresolvedCall.getFunctionDefinition(),
resolvedArgs),
Expand Down Expand Up @@ -275,29 +275,6 @@ private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definit

// --------------------------------------------------------------------------------------------

private static class UnsupportedDataTypeFactory implements DataTypeFactory {

@Override
public Optional<DataType> createDataType(String name) {
throw new TableException("Data type factory is not supported yet.");
}

@Override
public Optional<DataType> createDataType(UnresolvedIdentifier identifier) {
throw new TableException("Data type factory is not supported yet.");
}

@Override
public <T> DataType createDataType(Class<T> clazz) {
throw new TableException("Data type factory is not supported yet.");
}

@Override
public <T> DataType createRawDataType(Class<T> clazz) {
throw new TableException("Data type factory is not supported yet.");
}
}

private static class TableApiCallContext implements CallContext {

private final DataTypeFactory typeFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.LocalReferenceExpression;
Expand Down Expand Up @@ -68,6 +69,11 @@ interface ResolutionContext {
*/
FunctionLookup functionLookup();

/**
* Access to {@link DataTypeFactory}.
*/
DataTypeFactory typeFactory();

/**
* Enables the creation of resolved expressions for transformations after the actual resolution.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
Expand Down Expand Up @@ -88,6 +89,7 @@ public final class OperationTreeBuilder {

private final TableConfig config;
private final FunctionLookup functionCatalog;
private final DataTypeFactory typeFactory;
private final TableReferenceLookup tableReferenceLookup;
private final LookupCallResolver lookupResolver;

Expand All @@ -104,6 +106,7 @@ public final class OperationTreeBuilder {
private OperationTreeBuilder(
TableConfig config,
FunctionLookup functionLookup,
DataTypeFactory typeFactory,
TableReferenceLookup tableReferenceLookup,
ProjectionOperationFactory projectionOperationFactory,
SortOperationFactory sortOperationFactory,
Expand All @@ -113,6 +116,7 @@ private OperationTreeBuilder(
JoinOperationFactory joinOperationFactory) {
this.config = config;
this.functionCatalog = functionLookup;
this.typeFactory = typeFactory;
this.tableReferenceLookup = tableReferenceLookup;
this.projectionOperationFactory = projectionOperationFactory;
this.sortOperationFactory = sortOperationFactory;
Expand All @@ -126,11 +130,13 @@ private OperationTreeBuilder(
public static OperationTreeBuilder create(
TableConfig config,
FunctionLookup functionCatalog,
DataTypeFactory typeFactory,
TableReferenceLookup tableReferenceLookup,
boolean isStreamingMode) {
return new OperationTreeBuilder(
config,
functionCatalog,
typeFactory,
tableReferenceLookup,
new ProjectionOperationFactory(),
new SortOperationFactory(isStreamingMode),
Expand Down Expand Up @@ -178,6 +184,7 @@ private QueryOperation projectInternal(
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
child)
.withOverWindows(overWindows)
.build();
Expand Down Expand Up @@ -246,6 +253,7 @@ public QueryOperation windowAggregate(
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
child)
.withLocalReferences(
new LocalReferenceExpression(
Expand Down Expand Up @@ -296,6 +304,7 @@ public QueryOperation windowAggregate(
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
child)
.withLocalReferences(
new LocalReferenceExpression(
Expand Down Expand Up @@ -343,6 +352,7 @@ public QueryOperation join(
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
left,
right)
.build();
Expand Down Expand Up @@ -376,6 +386,7 @@ public Expression resolveExpression(Expression expression, QueryOperation... tab
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
tableOperation).build();

return resolveSingleExpression(expression, resolver);
Expand Down Expand Up @@ -545,11 +556,11 @@ private AggregateWithAlias(UnresolvedCallExpression aggregate, List<String> alia
private static class ExtractAliasAndAggregate extends ApiExpressionDefaultVisitor<AggregateWithAlias> {

// need this flag to validate alias, i.e., the length of alias and function result type should be same.
private boolean isRowbasedAggregate = false;
private ExpressionResolver resolver = null;
private boolean isRowBasedAggregate;
private ExpressionResolver resolver;

public ExtractAliasAndAggregate(boolean isRowbasedAggregate, ExpressionResolver resolver) {
this.isRowbasedAggregate = isRowbasedAggregate;
public ExtractAliasAndAggregate(boolean isRowBasedAggregate, ExpressionResolver resolver) {
this.isRowBasedAggregate = isRowBasedAggregate;
this.resolver = resolver;
}

Expand Down Expand Up @@ -596,7 +607,7 @@ private Optional<AggregateWithAlias> getAggregate(
} else {
ResolvedExpression resolvedExpression =
resolver.resolve(Collections.singletonList(unresolvedCall)).get(0);
validateAlias(aliases, resolvedExpression, isRowbasedAggregate);
validateAlias(aliases, resolvedExpression, isRowBasedAggregate);
fieldNames = aliases;
}
return Optional.of(new AggregateWithAlias(unresolvedCall, fieldNames));
Expand Down Expand Up @@ -681,6 +692,7 @@ public QueryOperation windowTableAggregate(
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
child)
.withLocalReferences(
new LocalReferenceExpression(
Expand Down Expand Up @@ -781,6 +793,7 @@ private ExpressionResolver getResolver(QueryOperation child) {
config,
tableReferenceLookup,
functionCatalog,
typeFactory,
child)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.table.functions;

import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;

import java.util.Set;

/**
Expand All @@ -30,6 +33,11 @@ public FunctionKind getKind() {
return FunctionKind.SCALAR;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
throw new UnsupportedOperationException();
}

@Override
public Set<FunctionRequirement> getRequirements() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.table.functions;

import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;

import java.util.Set;

/**
Expand All @@ -30,6 +33,11 @@ public FunctionKind getKind() {
return FunctionKind.SCALAR;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
throw new UnsupportedOperationException();
}

@Override
public Set<FunctionRequirement> getRequirements() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -144,6 +147,11 @@ public final FunctionKind getKind() {
return FunctionKind.AGGREGATE;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
throw new TableException("Aggregate functions are not updated to the new type system yet.");
}

@Override
public Set<FunctionRequirement> getRequirements() {
final HashSet<FunctionRequirement> requirements = new HashSet<>();
Expand Down
Loading

0 comments on commit 2e62ae7

Please sign in to comment.