Skip to content

Commit

Permalink
[FLINK-17936][table] Introduce new type inference for AS
Browse files Browse the repository at this point in the history
Introduces the last missing pieces required to start porting the
other expressions for a consistent API behavior.

This closes apache#12331.
  • Loading branch information
twalthr committed May 26, 2020
1 parent 1cce2c5 commit b2db9ec
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.util.Preconditions;

import java.lang.reflect.Field;
Expand All @@ -35,6 +36,11 @@
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
import static org.apache.flink.table.functions.FunctionKind.OTHER;
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;

/**
* Dictionary of function definitions for all built-in functions.
Expand Down Expand Up @@ -886,7 +892,12 @@ public final class BuiltInFunctionDefinitions {
new BuiltInFunctionDefinition.Builder()
.name("as")
.kind(OTHER)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(
varyingSequence(
or(OUTPUT_IF_NULL, InputTypeStrategies.ANY),
and(InputTypeStrategies.LITERAL, logical(LogicalTypeFamily.CHARACTER_STRING)),
and(InputTypeStrategies.LITERAL, logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(TypeStrategies.argument(0))
.build();
public static final BuiltInFunctionDefinition STREAM_RECORD_TIMESTAMP =
new BuiltInFunctionDefinition.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,6 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
case fd: FunctionDefinition =>
fd match {

case AS =>
assert(args.size >= 2)
val name = getValue[String](args(1))
val extraNames = args
.drop(2)
.map(e => getValue[String](e))
Alias(args.head, name, extraNames)

case FLATTEN =>
assert(args.size == 1)
Flattening(args.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,49 +80,6 @@ case class PlannerResolvedFieldReference(
}
}

case class Alias(child: PlannerExpression, name: String, extraNames: Seq[String] = Seq())
extends UnaryExpression with NamedExpression {

override def toString = s"$child as '$name"

override private[flink] def resultType: TypeInformation[_] = child.resultType

override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
copy(child, name, extraNames).asInstanceOf[this.type]
}

override private[flink] def toAttribute: Attribute = {
if (valid) {
PlannerResolvedFieldReference(name, child.resultType)
} else {
UnresolvedFieldReference(name)
}
}

override private[flink] def validateInput(): ValidationResult = {
if (name == "*") {
ValidationFailure("Alias can not accept '*' as name.")
} else {
ValidationSuccess
}
}
}

case class UnresolvedAlias(child: PlannerExpression) extends UnaryExpression with NamedExpression {

override private[flink] def name: String =
throw new UnresolvedException("Invalid call to name on UnresolvedAlias")

override private[flink] def toAttribute: Attribute =
throw new UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")

override private[flink] def resultType: TypeInformation[_] =
throw new UnresolvedException("Invalid call to resultType on UnresolvedAlias")

override private[flink] lazy val valid = false
}

case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {

override private[flink] def resultType: TypeInformation[_] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ class CalcValidationTest extends TableTestBase {
case _: ValidationException => //ignore
}

try {
util.addTableSource[(Int, Long, String)]("Table2")
.select('_1 as '*, '_2 as 'b, '_1 as 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}

try {
util.addTableSource[(Int, Long, String)]("Table3").as("*", "b", "c")
fail("ValidationException expected")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,6 @@ class CalcValidationTest extends TableTestBase {
case _: ValidationException => //ignore
}

try {
util.addTable[(Int, Long, String)]("Table2")
.select('_1 as '*, '_2 as 'b, '_1 as 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}

try {
util.addTable[(Int, Long, String)]("Table3").as("*", "b", "c")
fail("ValidationException expected")
Expand Down

0 comments on commit b2db9ec

Please sign in to comment.