Skip to content

Commit

Permalink
[FLINK-6094] [table] Use the lexicographic smallest attribute as the …
Browse files Browse the repository at this point in the history
…common group id

This closes apache#5273.
  • Loading branch information
军长 authored and twalthr committed Jan 11, 2018
1 parent 00ad0eb commit e447000
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ object UpdatingPlanChecker {

/** Extracts the unique keys of the table produced by the plan. */
def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
getUniqueKeyGroups(plan).map(_.map(_._1).toArray)
}

/** Extracts the unique keys and groups of the table produced by the plan. */
def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = {
val keyExtractor = new UniqueKeyExtractor
keyExtractor.visit(plan).map(_.map(_._1).toArray)
keyExtractor.visit(plan)
}

private class AppendOnlyValidator extends RelVisitor {
Expand Down Expand Up @@ -101,7 +106,7 @@ object UpdatingPlanChecker {
.filter(io => inputKeys.get.map(e => e._1).contains(io._1))

val inputKeysMap = inputKeys.get.toMap
val inOutGroups = inputKeysAndOutput
val inOutGroups = inputKeysAndOutput.sorted.reverse
.map(e => (inputKeysMap(e._1), e._2))
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class UpdatingPlanCheckerTest {
.select('a as 'a1, 'a as 'a2, 'b.count)

util.verifyTableUniqueKey(resultTable, Seq("a1", "a2"))
// both a1 and a2 belong to the same group, i.e., a1. We use the lexicographic smallest
// attribute as the common group id
util.verifyTableKeyGroups(resultTable, Seq(("a1", "a1"), ("a2", "a1")))
}

@Test
Expand Down Expand Up @@ -210,13 +213,25 @@ class UpdatePlanCheckerUtil extends StreamTableTestUtil {
verifyTableUniqueKey(tableEnv.sql(query), expected)
}

def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = {
val relNode = resultTable.getRelNode
val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
val actual = UpdatingPlanChecker.getUniqueKeyFields(optimized)
UpdatingPlanChecker.getUniqueKeyGroups(optimized)
}

def verifyTableKeyGroups(resultTable: Table, expected: Seq[(String, String)]): Unit = {
val actual = getKeyGroups(resultTable)
if (actual.isDefined) {
assertEquals(expected.sorted, actual.get.sorted)
} else {
assertEquals(expected.sorted, Nil)
}
}

def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
val actual = getKeyGroups(resultTable).map(_.map(_._1))
if (actual.isDefined) {
assertEquals(expected.sorted, actual.get.toSeq.sorted)
assertEquals(expected.sorted, actual.get.sorted)
} else {
assertEquals(expected.sorted, Nil)
}
Expand Down

0 comments on commit e447000

Please sign in to comment.