diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index d3523a85cbb52..66fa84aca2a87 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -23,6 +23,7 @@ import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategies; import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.util.Preconditions; import java.lang.reflect.Field; @@ -35,6 +36,11 @@ import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; import static org.apache.flink.table.functions.FunctionKind.OTHER; import static org.apache.flink.table.functions.FunctionKind.SCALAR; +import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL; +import static org.apache.flink.table.types.inference.InputTypeStrategies.and; +import static org.apache.flink.table.types.inference.InputTypeStrategies.logical; +import static org.apache.flink.table.types.inference.InputTypeStrategies.or; +import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence; /** * Dictionary of function definitions for all built-in functions. @@ -886,7 +892,12 @@ public final class BuiltInFunctionDefinitions { new BuiltInFunctionDefinition.Builder() .name("as") .kind(OTHER) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + varyingSequence( + or(OUTPUT_IF_NULL, InputTypeStrategies.ANY), + and(InputTypeStrategies.LITERAL, logical(LogicalTypeFamily.CHARACTER_STRING)), + and(InputTypeStrategies.LITERAL, logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(TypeStrategies.argument(0)) .build(); public static final BuiltInFunctionDefinition STREAM_RECORD_TIMESTAMP = new BuiltInFunctionDefinition.Builder() diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala index e3380556fea5f..3f6ced979f775 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala @@ -149,14 +149,6 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp case fd: FunctionDefinition => fd match { - case AS => - assert(args.size >= 2) - val name = getValue[String](args(1)) - val extraNames = args - .drop(2) - .map(e => getValue[String](e)) - Alias(args.head, name, extraNames) - case FLATTEN => assert(args.size == 1) Flattening(args.head) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala index 1a0f5b31992ee..fd86f6749124f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala @@ -80,49 +80,6 @@ case class PlannerResolvedFieldReference( } } -case class Alias(child: PlannerExpression, name: String, extraNames: Seq[String] = Seq()) - extends UnaryExpression with NamedExpression { - - override def toString = s"$child as '$name" - - override private[flink] def resultType: TypeInformation[_] = child.resultType - - override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = { - val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression] - copy(child, name, extraNames).asInstanceOf[this.type] - } - - override private[flink] def toAttribute: Attribute = { - if (valid) { - PlannerResolvedFieldReference(name, child.resultType) - } else { - UnresolvedFieldReference(name) - } - } - - override private[flink] def validateInput(): ValidationResult = { - if (name == "*") { - ValidationFailure("Alias can not accept '*' as name.") - } else { - ValidationSuccess - } - } -} - -case class UnresolvedAlias(child: PlannerExpression) extends UnaryExpression with NamedExpression { - - override private[flink] def name: String = - throw new UnresolvedException("Invalid call to name on UnresolvedAlias") - - override private[flink] def toAttribute: Attribute = - throw new UnresolvedException("Invalid call to toAttribute on UnresolvedAlias") - - override private[flink] def resultType: TypeInformation[_] = - throw new UnresolvedException("Invalid call to resultType on UnresolvedAlias") - - override private[flink] lazy val valid = false -} - case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute { override private[flink] def resultType: TypeInformation[_] = diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CalcValidationTest.scala index b07fbcee7210f..14b2a19beabdf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CalcValidationTest.scala @@ -84,14 +84,6 @@ class CalcValidationTest extends TableTestBase { case _: ValidationException => //ignore } - try { - util.addTableSource[(Int, Long, String)]("Table2") - .select('_1 as '*, '_2 as 'b, '_1 as 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - try { util.addTableSource[(Int, Long, String)]("Table3").as("*", "b", "c") fail("ValidationException expected") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala index b168aa90f415b..1be3689c980c1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala @@ -102,14 +102,6 @@ class CalcValidationTest extends TableTestBase { case _: ValidationException => //ignore } - try { - util.addTable[(Int, Long, String)]("Table2") - .select('_1 as '*, '_2 as 'b, '_1 as 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - try { util.addTable[(Int, Long, String)]("Table3").as("*", "b", "c") fail("ValidationException expected")