From f8280313b76dda98323e8b4c0f8b23193ebe57a8 Mon Sep 17 00:00:00 2001 From: Kostas Kloudas Date: Tue, 27 Nov 2018 13:18:45 +0100 Subject: [PATCH] [FLINK-10584][table] Add State Retention support to TemporalRowtimeJoin. --- ...nputStreamOperatorWithStateRetention.scala | 3 +- .../runtime/join/TemporalRowtimeJoin.scala | 40 ++++-- .../harness/TemporalJoinHarnessTest.scala | 128 ++++++++++++++++++ .../TwoInputStreamOperatorTestHarness.java | 5 + 4 files changed, 160 insertions(+), 16 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala index fda5c2e168650..f44d7997c8f6a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala @@ -58,13 +58,14 @@ abstract class BaseTwoInputStreamOperatorWithStateRetention( private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime - private val stateCleaningEnabled: Boolean = minRetentionTime > 1 private val CLEANUP_TIMESTAMP = "cleanup-timestamp" private val TIMERS_STATE_NAME = "timers" private var latestRegisteredCleanUpTimer: ValueState[JLong] = _ + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + protected var timerService: SimpleTimerService = _ override def open(): Unit = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala index 66b60d44969fa..f6911096f9371 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala @@ -26,8 +26,7 @@ import org.apache.flink.api.common.functions.util.FunctionUtils import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} -import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.runtime.state.VoidNamespace import org.apache.flink.streaming.api.operators._ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.table.api.StreamQueryConfig @@ -69,9 +68,7 @@ class TemporalRowtimeJoin( queryConfig: StreamQueryConfig, leftTimeAttribute: Int, rightTimeAttribute: Int) - extends AbstractStreamOperator[CRow] - with TwoInputStreamOperator[CRow, CRow, CRow] - with Triggerable[Any, VoidNamespace] + extends BaseTwoInputStreamOperatorWithStateRetention(queryConfig) with Compiler[FlatJoinFunction[Row, Row, Row]] with Logging { @@ -112,7 +109,6 @@ class TemporalRowtimeJoin( private var cRowWrapper: CRowWrappingCollector = _ private var collector: TimestampedCollector[CRow] = _ - private var timerService: SimpleTimerService = _ private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ @@ -142,12 +138,7 @@ class TemporalRowtimeJoin( cRowWrapper.out = collector cRowWrapper.setChange(true) - val internalTimerService = getInternalTimerService( - TIMERS_STATE_NAME, - VoidNamespaceSerializer.INSTANCE, - this) - - timerService = new SimpleTimerService(internalTimerService) + super.open() } override def processElement1(element: StreamRecord[CRow]): Unit = { @@ -155,6 +146,8 @@ class TemporalRowtimeJoin( leftState.put(getNextLeftIndex, element.getValue.row) registerSmallestTimer(getLeftTime(element.getValue.row)) // Timer to emit and clean up the state + + registerProcessingCleanUpTimer() } override def processElement2(element: StreamRecord[CRow]): Unit = { @@ -163,10 +156,8 @@ class TemporalRowtimeJoin( val rowTime = getRightTime(element.getValue.row) rightState.put(rowTime, element.getValue.row) registerSmallestTimer(rowTime) // Timer to clean up the state - } - override def onProcessingTime(timer: InternalTimer[Any, VoidNamespace]): Unit = { - throw new IllegalStateException("This should never happen") + registerProcessingCleanUpTimer() } override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = { @@ -175,6 +166,15 @@ class TemporalRowtimeJoin( if (lastUnprocessedTime < Long.MaxValue) { registerTimer(lastUnprocessedTime) } + + // if we have more state at any side, then update the timer, else clean it up. + if (stateCleaningEnabled) { + if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) { + registerProcessingCleanUpTimer() + } else { + cleanUpLastTimer() + } + } } override def close(): Unit = { @@ -245,6 +245,16 @@ class TemporalRowtimeJoin( } } + /** + * The method to be called when a cleanup timer fires. + * + * @param time The timestamp of the fired timer. + */ + override def cleanUpState(time: Long): Unit = { + leftState.clear() + rightState.clear() + } + private def firstIndexToKeep(timerTimestamp: Long, rightRowsSorted: util.List[Row]): Int = { val firstIndexNewerThenTimer = indexOfFirstElementNewerThanTimer(timerTimestamp, rightRowsSorted) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala index 0d46db4197079..7b942c284ea57 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala @@ -530,6 +530,134 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // ---------------------- Row time TTL tests ---------------------- + + @Test + def testRowTimeJoinCleanupTimerUpdatedFromProbeSide(): Unit = { + // min=2ms max=4ms + val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + + testHarness.open() + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + testHarness.setProcessingTime(1L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) + testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + + testHarness.processBothWatermarks(new Watermark(2L)) + + // this should update the clean-up timer to 8 + testHarness.setProcessingTime(4L) + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + + // this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8) + testHarness.setProcessingTime(5L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L))) + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 0L))) + + testHarness.processBothWatermarks(new Watermark(5L)) + + // this should now clean up the state + testHarness.setProcessingTime(8L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match + + testHarness.processBothWatermarks(new Watermark(10L)) + + verify(expectedOutput, testHarness.getOutput) + + testHarness.close() + } + + @Test + def testRowTimeJoinCleanupTimerUpdatedFromBuildSide(): Unit = { + // min=2ms max=4ms + val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + + testHarness.open() + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + testHarness.setProcessingTime(1L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) + testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + + testHarness.processBothWatermarks(new Watermark(2L)) + + // this should update the clean-up timer to 8 + testHarness.setProcessingTime(4L) + testHarness.processElement2(new StreamRecord(CRow("Euro", 117L, 4L))) + + // this should now do nothing + testHarness.setProcessingTime(5L) + + // so this should be joined with the "old" value + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 3L))) + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 3L, "Euro", 114L, 0L))) + + testHarness.processBothWatermarks(new Watermark(5L)) + + // this should now clean up the state + testHarness.setProcessingTime(8L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match + + testHarness.processBothWatermarks(new Watermark(10L)) + + verify(expectedOutput, testHarness.getOutput) + + testHarness.close() + } + + @Test + def testRowTimeJoinCleanupTimerUpdatedAfterEvaluation(): Unit = { + // min=2ms max=4ms + val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + + testHarness.open() + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + testHarness.setProcessingTime(1L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) + testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + + testHarness.setProcessingTime(4L) + + // this should trigger an evaluation, which should also update the clean-up timer to 8 + testHarness.processBothWatermarks(new Watermark(2L)) + + // this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8) + testHarness.setProcessingTime(5L) + + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 3L))) + expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 3L, "Euro", 114L, 0L))) + + testHarness.processBothWatermarks(new Watermark(5L)) + + // this should now clean up the state + testHarness.setProcessingTime(8L) + + // so this should not find any match + testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) + + testHarness.processBothWatermarks(new Watermark(10L)) + + verify(expectedOutput, testHarness.getOutput) + + testHarness.close() + } + def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER) : (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, CRow]) = { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 7bb697331da6c..9289252479a9b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -64,4 +64,9 @@ public void processWatermark1(Watermark mark) throws Exception { public void processWatermark2(Watermark mark) throws Exception { twoInputOperator.processWatermark2(mark); } + + public void processBothWatermarks(Watermark mark) throws Exception { + twoInputOperator.processWatermark1(mark); + twoInputOperator.processWatermark2(mark); + } }