Skip to content

Commit

Permalink
[FLINK-10676][table] Change the over window preceding clause from req…
Browse files Browse the repository at this point in the history
…uired to optional.

This closes apache#6949
  • Loading branch information
hequn8128 authored and sunjincheng121 committed Nov 7, 2018
1 parent b00c8d3 commit 483507a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 8 deletions.
4 changes: 3 additions & 1 deletion docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -1571,13 +1571,15 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
</tr>
<tr>
<td><code>preceding</code></td>
<td>Required</td>
<td>Optional</td>
<td>
<p>Defines the interval of rows that are included in the window and precede the current row. The interval can either be specified as time or row-count interval.</p>

<p><a href="tableApi.html#bounded-over-windows">Bounded over windows</a> are specified with the size of the interval, e.g., <code>10.minutes</code> for a time interval or <code>10.rows</code> for a row-count interval.</p>

<p><a href="tableApi.html#unbounded-over-windows">Unbounded over windows</a> are specified using a constant, i.e., <code>UNBOUNDED_RANGE</code> for a time interval or <code>UNBOUNDED_ROW</code> for a row-count interval. Unbounded over windows start with the first row of a partition.</p>
<p>If the <code>preceding</code> clause is omitted, <code>UNBOUNDED_RANGE</code> and <code>CURRENT_RANGE</code> are used as the default <code>preceding</code> and <code>following</code> for the window.</p>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.flink.table.api.java

import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE}
import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}

/**
Expand Down Expand Up @@ -144,4 +145,21 @@ class OverWindowWithOrderBy(
new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
}

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: Expression): OverWindow = {
OverWindow(alias, partitionByExpr, orderByExpr, UNBOUNDED_RANGE, CURRENT_RANGE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.table.api.scala

import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}

/**
* Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
Expand Down Expand Up @@ -127,7 +127,6 @@ case class PartitionedOver(partitionBy: Array[Expression]) {

case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) {


/**
* Set the preceding offset (based on time or row-count intervals) for over window.
*
Expand All @@ -138,4 +137,21 @@ case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expressi
new OverWindowWithPreceding(partitionBy, orderBy, preceding)
}

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: Expression): OverWindow = {
OverWindow(alias, partitionBy, orderBy, UNBOUNDED_RANGE, CURRENT_RANGE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,15 @@ class OverWindowTest extends TableTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"FROM MyTable " +
"WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)"

val sql3 = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY proctime) as cnt1, " +
"sum(a) OVER (PARTITION BY c ORDER BY proctime) as cnt2 " +
"from MyTable"

streamUtil.verifySqlPlansIdentical(sql, sql2)
streamUtil.verifySqlPlansIdentical(sql, sql3)

val expected =
unaryNode(
Expand Down Expand Up @@ -523,6 +531,13 @@ class OverWindowTest extends TableTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"

val sql1 = "SELECT " +
"c, " +
"count(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt1, " +
"sum(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt2 " +
"from MyTable"
streamUtil.verifySqlPlansIdentical(sql, sql1)

val expected =
unaryNode(
"DataStreamCalc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,15 @@ class OverWindowTest extends TableTestBase {
val weightedAvg = new WeightedAvgWithRetract

val result = table
.window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE following
CURRENT_RANGE as 'w)
.window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
.select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)

val result2 = table
.window(Over partitionBy 'c orderBy 'proctime as 'w)
.select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)

streamUtil.verify2Tables(result, result2)

val expected =
unaryNode(
"DataStreamCalc",
Expand Down Expand Up @@ -459,6 +464,12 @@ class OverWindowTest extends TableTestBase {
CURRENT_RANGE as 'w)
.select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)

val result2 = table
.window(Over partitionBy 'c orderBy 'rowtime as 'w)
.select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)

streamUtil.verify2Tables(result, result2)

val expected =
unaryNode(
"DataStreamCalc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
}

@Test
def testUnboundedOverRange(): Unit = {
def testRowTimeUnboundedOverRange(): Unit = {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)

Expand All @@ -134,8 +134,37 @@ class OverWindowStringExpressionTest extends TableTestBase {
.window(
JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
.select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
val resJava2 = t
.window(
JOver.orderBy("rowtime").as("w"))
.select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")

verifyTableEquals(resScala, resJava)
verifyTableEquals(resScala, resJava2)
}

@Test
def testProcTimeUnboundedOverRange(): Unit = {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'proctime.proctime)

val weightAvgFun = new WeightedAvg
util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)

val resScala = t
.window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
.select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(
JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w"))
.select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
val resJava2 = t
.window(
JOver.orderBy("proctime").as("w"))
.select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")

verifyTableEquals(resScala, resJava)
verifyTableEquals(resScala, resJava2)
}

@Test
Expand Down

0 comments on commit 483507a

Please sign in to comment.