Skip to content

Commit

Permalink
[FLINK-7776] [table] Prevent emission of identical update records in …
Browse files Browse the repository at this point in the history
…group aggregation.

This closes apache#4785.
  • Loading branch information
Xpray authored and fhueske committed Oct 10, 2017
1 parent 1ea7f49 commit 4047be4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,19 @@ class GroupAggProcessFunction(
state.update(accumulators)
cntState.update(inputCnt)

// if this was not the first row and we have to emit retractions
if (generateRetraction && !firstRow) {
// if this was not the first row
if (!firstRow) {
if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
// newRow is the same as before and state cleaning is not enabled.
// We do not emit retraction and acc message.
// We emit nothing
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return
} else {
// retract previous result
out.collect(prevRow)
if (generateRetraction) {
out.collect(prevRow)
}
}
}
// emit the new result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,29 @@ class AggregateITCase extends StreamingWithStateTestBase {
// verify agg close is called
assert(JavaUserDefinedAggFunctions.isCloseCalled)
}

@Test
def testRemoveDuplicateRecordsWithUpsertSink(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear

val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "A"))
data.+=((2, 2L, "B"))
data.+=((3, 2L, "B"))
data.+=((4, 3L, "C"))
data.+=((5, 3L, "C"))

val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
.groupBy('c)
.select('c, 'b.max)

t.writeToSink(new TestUpsertSink(Array("c"), false))
env.execute()

val expected = List("(true,A,1)", "(true,B,2)", "(true,C,3)")
assertEquals(expected.sorted, RowCollector.getAndClearValues.map(_.toString).sorted)
}
}

0 comments on commit 4047be4

Please sign in to comment.