Skip to content

Commit

Permalink
[FLINK-8366] [table] Fix UpsertTableSink tests.
Browse files Browse the repository at this point in the history
This closes apache#5244.
  • Loading branch information
军长 authored and fhueske committed Apr 17, 2018
1 parent 518adb0 commit 3adc21d
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TableSinkITCase extends AbstractTestBase {
val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)

val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
.where('a < 3 || 'a > 19)
.select('c, 't, 'b)
.insertInto("targetTable")
Expand Down Expand Up @@ -96,7 +96,7 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4

val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))
Expand Down Expand Up @@ -677,11 +677,10 @@ object RowCollector {
/** Converts a list of upsert messages into a list of final results. */
def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {

def getKeys(r: Row): List[String] =
keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
def getKeys(r: Row): Row = Row.project(r, keys)

val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
val key = getKeys(r.f1).mkString("")
val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
val key = getKeys(r.f1)
if (r.f0) {
o + (key -> r.f1.toString)
} else {
Expand Down

0 comments on commit 3adc21d

Please sign in to comment.