Skip to content

Commit

Permalink
[FLINK-2000][table] Add SQL-style Aggregation Support
Browse files Browse the repository at this point in the history
  • Loading branch information
chhao01 authored and aljoscha committed Jun 8, 2015
1 parent 804430b commit 7805db8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// KeyWord

lazy val AS: Keyword = Keyword("as")
lazy val COUNT: Keyword = Keyword("count")
lazy val AVG: Keyword = Keyword("avg")
lazy val MIN: Keyword = Keyword("min")
lazy val MAX: Keyword = Keyword("max")
lazy val SUM: Keyword = Keyword("sum")

// Literals

Expand Down Expand Up @@ -91,11 +96,16 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {

lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }

lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) }
lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
lazy val sum: PackratParser[Expression] =
(atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
lazy val min: PackratParser[Expression] =
(atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
lazy val max: PackratParser[Expression] =
(atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
lazy val count: PackratParser[Expression] =
(atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
lazy val avg: PackratParser[Expression] =
(atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })

lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
case e ~ _ ~ as ~ _ => Naming(e, as.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,25 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
env.execute()
expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
}

@Test
def testSQLStyleAggregations(): Unit = {

// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output

val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
.select(
"""Sum( a) as a1, a.sum as a2,
|Min (a) as b1, a.min as b2,
|Max (a ) as c1, a.max as c2,
|Avg ( a ) as d1, a.avg as d2,
|Count(a) as e1, a.count as e2
""".stripMargin)

ds.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
expected = "231,231,1,1,21,21,11,11,21,21"
}
}

0 comments on commit 7805db8

Please sign in to comment.