Skip to content

Commit

Permalink
[FLINK-10584][table] Add State Retention support to TemporalRowtimeJoin.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jan 9, 2019
1 parent 81ec2d7 commit f828031
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ abstract class BaseTwoInputStreamOperatorWithStateRetention(

private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
private val stateCleaningEnabled: Boolean = minRetentionTime > 1

private val CLEANUP_TIMESTAMP = "cleanup-timestamp"
private val TIMERS_STATE_NAME = "timers"

private var latestRegisteredCleanUpTimer: ValueState[JLong] = _

protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

protected var timerService: SimpleTimerService = _

override def open(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
import org.apache.flink.streaming.api.SimpleTimerService
import org.apache.flink.runtime.state.VoidNamespace
import org.apache.flink.streaming.api.operators._
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.table.api.StreamQueryConfig
Expand Down Expand Up @@ -69,9 +68,7 @@ class TemporalRowtimeJoin(
queryConfig: StreamQueryConfig,
leftTimeAttribute: Int,
rightTimeAttribute: Int)
extends AbstractStreamOperator[CRow]
with TwoInputStreamOperator[CRow, CRow, CRow]
with Triggerable[Any, VoidNamespace]
extends BaseTwoInputStreamOperatorWithStateRetention(queryConfig)
with Compiler[FlatJoinFunction[Row, Row, Row]]
with Logging {

Expand Down Expand Up @@ -112,7 +109,6 @@ class TemporalRowtimeJoin(

private var cRowWrapper: CRowWrappingCollector = _
private var collector: TimestampedCollector[CRow] = _
private var timerService: SimpleTimerService = _

private var joinFunction: FlatJoinFunction[Row, Row, Row] = _

Expand Down Expand Up @@ -142,19 +138,16 @@ class TemporalRowtimeJoin(
cRowWrapper.out = collector
cRowWrapper.setChange(true)

val internalTimerService = getInternalTimerService(
TIMERS_STATE_NAME,
VoidNamespaceSerializer.INSTANCE,
this)

timerService = new SimpleTimerService(internalTimerService)
super.open()
}

override def processElement1(element: StreamRecord[CRow]): Unit = {
checkNotRetraction(element)

leftState.put(getNextLeftIndex, element.getValue.row)
registerSmallestTimer(getLeftTime(element.getValue.row)) // Timer to emit and clean up the state

registerProcessingCleanUpTimer()
}

override def processElement2(element: StreamRecord[CRow]): Unit = {
Expand All @@ -163,10 +156,8 @@ class TemporalRowtimeJoin(
val rowTime = getRightTime(element.getValue.row)
rightState.put(rowTime, element.getValue.row)
registerSmallestTimer(rowTime) // Timer to clean up the state
}

override def onProcessingTime(timer: InternalTimer[Any, VoidNamespace]): Unit = {
throw new IllegalStateException("This should never happen")
registerProcessingCleanUpTimer()
}

override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = {
Expand All @@ -175,6 +166,15 @@ class TemporalRowtimeJoin(
if (lastUnprocessedTime < Long.MaxValue) {
registerTimer(lastUnprocessedTime)
}

// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) {
registerProcessingCleanUpTimer()
} else {
cleanUpLastTimer()
}
}
}

override def close(): Unit = {
Expand Down Expand Up @@ -245,6 +245,16 @@ class TemporalRowtimeJoin(
}
}

/**
* The method to be called when a cleanup timer fires.
*
* @param time The timestamp of the fired timer.
*/
override def cleanUpState(time: Long): Unit = {
leftState.clear()
rightState.clear()
}

private def firstIndexToKeep(timerTimestamp: Long, rightRowsSorted: util.List[Row]): Int = {
val firstIndexNewerThenTimer =
indexOfFirstElementNewerThanTimer(timerTimestamp, rightRowsSorted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,134 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
0)
}

// ---------------------- Row time TTL tests ----------------------

@Test
def testRowTimeJoinCleanupTimerUpdatedFromProbeSide(): Unit = {
// min=2ms max=4ms
val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo())

testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()

testHarness.setProcessingTime(1L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))

testHarness.processBothWatermarks(new Watermark(2L))

// this should update the clean-up timer to 8
testHarness.setProcessingTime(4L)
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L)))

// this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8)
testHarness.setProcessingTime(5L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 0L)))

testHarness.processBothWatermarks(new Watermark(5L))

// this should now clean up the state
testHarness.setProcessingTime(8L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match

testHarness.processBothWatermarks(new Watermark(10L))

verify(expectedOutput, testHarness.getOutput)

testHarness.close()
}

@Test
def testRowTimeJoinCleanupTimerUpdatedFromBuildSide(): Unit = {
// min=2ms max=4ms
val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo())

testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()

testHarness.setProcessingTime(1L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))

testHarness.processBothWatermarks(new Watermark(2L))

// this should update the clean-up timer to 8
testHarness.setProcessingTime(4L)
testHarness.processElement2(new StreamRecord(CRow("Euro", 117L, 4L)))

// this should now do nothing
testHarness.setProcessingTime(5L)

// so this should be joined with the "old" value
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 3L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 3L, "Euro", 114L, 0L)))

testHarness.processBothWatermarks(new Watermark(5L))

// this should now clean up the state
testHarness.setProcessingTime(8L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match

testHarness.processBothWatermarks(new Watermark(10L))

verify(expectedOutput, testHarness.getOutput)

testHarness.close()
}

@Test
def testRowTimeJoinCleanupTimerUpdatedAfterEvaluation(): Unit = {
// min=2ms max=4ms
val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo())

testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()

testHarness.setProcessingTime(1L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))

expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))

testHarness.setProcessingTime(4L)

// this should trigger an evaluation, which should also update the clean-up timer to 8
testHarness.processBothWatermarks(new Watermark(2L))

// this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8)
testHarness.setProcessingTime(5L)

testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 3L)))
expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 3L, "Euro", 114L, 0L)))

testHarness.processBothWatermarks(new Watermark(5L))

// this should now clean up the state
testHarness.setProcessingTime(8L)

// so this should not find any match
testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L)))

testHarness.processBothWatermarks(new Watermark(10L))

verify(expectedOutput, testHarness.getOutput)

testHarness.close()
}

def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
: (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, CRow]) = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public void processWatermark1(Watermark mark) throws Exception {
public void processWatermark2(Watermark mark) throws Exception {
twoInputOperator.processWatermark2(mark);
}

public void processBothWatermarks(Watermark mark) throws Exception {
twoInputOperator.processWatermark1(mark);
twoInputOperator.processWatermark2(mark);
}
}

0 comments on commit f828031

Please sign in to comment.