Skip to content

Commit

Permalink
[FLINK-18632][table-planner-blink] Assign the missing RowKind when to…
Browse files Browse the repository at this point in the history
…RetractStream with POJO type

Co-authored-by: luoziyu <[email protected]>

This closes apache#12955
  • Loading branch information
lzy3261944 committed Jul 24, 2020
1 parent ede4da5 commit fc36235
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object SinkCodeGenerator {
afterIndexModify = CodeGenUtils.newName("afterIndexModify")
s"""
|${conversion.code}
|${conversion.resultTerm}.setRowKind(${inputTerm}.getRowKind());
|${classOf[RowData].getCanonicalName} $afterIndexModify = ${conversion.resultTerm};
|""".stripMargin
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,27 @@ class StreamTableEnvironmentITCase extends StreamingTestBase {
"(true,Person{name='Jack', age=3})")
assertEquals(expected.sorted, sink.getResults.sorted)
}

@Test
def testRetractMsgWithPojoType(): Unit = {
val orders = env.fromCollection(Seq(
new Order(1L, new ProductItem("beer", 10L), 1),
new Order(1L, new ProductItem("beer", 10L), 2)
))

val table = tEnv.fromDataStream(orders, 'user, 'product, 'amount)

val sink = new StringSink[(Boolean, Order)]()
tEnv.sqlQuery(s"""|SELECT user, product, sum(amount) as amount
|FROM $table
|GROUP BY user, product
|""".stripMargin).toRetractStream[Order].addSink(sink)
env.execute()

val expected = List(
"(true,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
"(false,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
"(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
assertEquals(expected.sorted, sink.getResults.sorted)
}
}

0 comments on commit fc36235

Please sign in to comment.