Skip to content

Commit

Permalink
[FLINK-4420] [table] Introduce star(*) to select all of the columns i…
Browse files Browse the repository at this point in the history
…n the table

This closes apache#2384.
  • Loading branch information
wuchong authored and twalthr committed Aug 29, 2016
1 parent c2585c6 commit 1f17886
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 8 deletions.
9 changes: 9 additions & 0 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ This section gives a brief overview of the available operators. You can find mor
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.select("a, c as d");
{% endhighlight %}
<p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p>
{% highlight java %}
Table result = in.select("*");
{% endhighlight %}
</td>
</tr>
Expand Down Expand Up @@ -722,6 +726,11 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
{% highlight scala %}
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.select('a, 'c as 'd);
{% endhighlight %}
<p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p>
{% highlight scala %}
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.select('*);
{% endhighlight %}
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ abstract class TableEnvironment(val config: TableConfig) {
throw new TableException(s"Type $tpe lacks explicit field naming")
}
val fieldIndexes = fieldNames.indices.toArray

if (fieldNames.contains("*")) {
throw new ValidationException("Field name can not be '*'.")
}

(fieldNames, fieldIndexes)
}

Expand Down Expand Up @@ -336,6 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) {
}

val (fieldIndexes, fieldNames) = indexedNames.unzip

if (fieldNames.contains("*")) {
throw new ValidationException("Field name can not be '*'.")
}

(fieldNames.toArray, fieldIndexes.toArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val MINUTE: Keyword = Keyword("minute")
lazy val SECOND: Keyword = Keyword("second")
lazy val MILLI: Keyword = Keyword("milli")
lazy val STAR: Keyword = Keyword("*")

def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
Expand Down Expand Up @@ -159,7 +160,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
stringLiteralFlink | singleQuoteStringLiteral |
boolLiteral | nullLiteral

lazy val fieldReference: PackratParser[NamedExpression] = ident ^^ {
lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
sym => UnresolvedFieldReference(sym)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.calcite.tools.RelBuilder

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.UnresolvedException
import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure}
import org.apache.flink.api.table.validate.{ValidationSuccess, ExprValidationResult,
ValidationFailure}

trait NamedExpression extends Expression {
private[flink] def name: String
Expand Down Expand Up @@ -91,6 +92,14 @@ case class Alias(child: Expression, name: String)
UnresolvedFieldReference(name)
}
}

override private[flink] def validateInput(): ExprValidationResult = {
if (name == "*") {
ValidationFailure("Alias can not accept '*' as name.")
} else {
ValidationSuccess
}
}
}

case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package org.apache.flink.api.table.plan

import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.plan.logical.LogicalNode

import scala.collection.mutable.ListBuffer

object RexNodeTranslator {

Expand Down Expand Up @@ -68,4 +71,18 @@ object RexNodeTranslator {
(e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList)
}
}

/**
* Parses all input expressions to [[UnresolvedAlias]].
* And expands star to parent's full project list.
*/
def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = {
val projectList = new ListBuffer[NamedExpression]
exprs.foreach {
case n: UnresolvedFieldReference if n.name == "*" =>
projectList ++= parent.output.map(UnresolvedAlias(_))
case e: Expression => projectList += UnresolvedAlias(e)
}
projectList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
case n: Alias =>
// explicit name
if (names.contains(n.name)) {
throw ValidationException(s"Duplicate field name $n.name.")
throw ValidationException(s"Duplicate field name ${n.name}.")
} else {
names.add(n.name)
}
case r: ResolvedFieldReference =>
// simple field forwarding
if (names.contains(r.name)) {
throw ValidationException(s"Duplicate field name $r.name.")
throw ValidationException(s"Duplicate field name ${r.name}.")
} else {
names.add(r.name)
}
Expand Down Expand Up @@ -109,6 +109,8 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una
failValidation("Aliasing more fields than we actually have")
} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
failValidation("Alias only accept name expressions as arguments")
} else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) {
failValidation("Alias can not accept '*' as name")
} else {
val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
val input = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, UnresolvedAlias, Expression, Ordering}
import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
import org.apache.flink.api.table.plan.RexNodeTranslator._
import org.apache.flink.api.table.plan.logical._
import org.apache.flink.api.table.sinks.TableSink

Expand Down Expand Up @@ -78,14 +78,14 @@ class Table(
def select(fields: Expression*): Table = {
val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv))
val aggregations = projectionOnAggregates.flatMap(_._2)
val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan)
if (aggregations.nonEmpty) {
new Table(tableEnv,
Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
Project(projectList,
Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv))
} else {
new Table(tableEnv,
Project(
projectionOnAggregates.map(e => UnresolvedAlias(e._1)), logicalPlan).validate(tableEnv))
Project(projectList, logicalPlan).validate(tableEnv))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,27 @@ public void testSelectAmbiguousFieldNames() throws Exception {
// Must fail. Field foo does not exist
.select("a + 1 as foo, b + 2 as foo");
}

@Test
public void testSelectStar() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
Table in = tableEnv.fromDataSet(ds, "a,b,c");

Table result = in
.select("*");

DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
"20,6,Comment#14\n" + "21,6,Comment#15\n";
compareResultAsText(results, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Assert._
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
Expand Down Expand Up @@ -134,4 +135,56 @@ class SelectITCase(
.select('a, 'b as 'a).toDataSet[Row].print()
}

@Test
def testSelectStar(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)

val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
"15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
"19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testAliasStarException(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

try {
CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}

try {
CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
.select('_1 as '*, '_2 as 'b, '_1 as 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}

try {
CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}

try {
CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
fail("ValidationException expected")
} catch {
case _: ValidationException => //ignore
}
}

}

0 comments on commit 1f17886

Please sign in to comment.