Skip to content

Commit

Permalink
[FLINK-10543][table] Leverage efficient timer deletion in relational …
Browse files Browse the repository at this point in the history
…operators

This closes apache#6918
  • Loading branch information
hequn8128 authored and sunjincheng121 committed Dec 9, 2018
1 parent eaead20 commit 5716e4d
Show file tree
Hide file tree
Showing 24 changed files with 423 additions and 430 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.aggregate

import org.apache.flink.api.common.state.ValueState
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import java.lang.{Long => JLong}

import org.apache.flink.streaming.api.TimerService

/**
* Base class for clean up state, both for [[ProcessFunction]] and [[CoProcessFunction]].
*/
trait CleanupState {

def registerProcessingCleanupTimer(
cleanupTimeState: ValueState[JLong],
currentTime: Long,
minRetentionTime: Long,
maxRetentionTime: Long,
timerService: TimerService): Unit = {

// last registered timer
val curCleanupTime = cleanupTimeState.value()

// check if a cleanup timer is registered and
// that the current cleanup timer won't delete state we need to keep
if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
// we need to register a new (later) timer
val cleanupTime = currentTime + maxRetentionTime
// register timer and remember clean-up time
timerService.registerProcessingTimeTimer(cleanupTime)
// delete expired timer
if (curCleanupTime != null) {
timerService.deleteProcessingTimeTimer(curCleanupTime)
}
cleanupTimeState.update(cleanupTime)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.aggregate

import java.lang.{Long => JLong}

import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeDomain
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, Types}

abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: StreamQueryConfig)
extends CoProcessFunction[IN1, IN2, OUT]
with CleanupState {

protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

// holds the latest registered cleanup timer
private var cleanupTimeState: ValueState[JLong] = _

protected def initCleanupTimeState(stateName: String) {
if (stateCleaningEnabled) {
val cleanupTimeDescriptor: ValueStateDescriptor[JLong] =
new ValueStateDescriptor[JLong](stateName, Types.LONG)
cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
}
}

protected def processCleanupTimer(
ctx: CoProcessFunction[IN1, IN2, OUT]#Context,
currentTime: Long): Unit = {
if (stateCleaningEnabled) {
registerProcessingCleanupTimer(
cleanupTimeState,
currentTime,
minRetentionTime,
maxRetentionTime,
ctx.timerService()
)
}
}

protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
ctx.timeDomain() == TimeDomain.PROCESSING_TIME
}

protected def cleanupState(states: State*): Unit = {
// clear all state
states.foreach(_.clear())
this.cleanupTimeState.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class GroupAggProcessFunction(

val currentTime = ctx.timerService().currentProcessingTime()
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime)
processCleanupTimer(ctx, currentTime)

val input = inputC.row

Expand Down Expand Up @@ -172,7 +172,7 @@ class GroupAggProcessFunction(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {

if (needToCleanupState(timestamp)) {
if (stateCleaningEnabled) {
cleanupState(state, cntState)
function.cleanup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFu
import org.apache.flink.table.api.{StreamQueryConfig, Types}

abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: StreamQueryConfig)
extends KeyedProcessFunction[K, I, O] {
extends KeyedProcessFunction[K, I, O]
with CleanupState {

protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

// holds the latest registered cleanup timer
private var cleanupTimeState: ValueState[JLong] = _
protected var cleanupTimeState: ValueState[JLong] = _

protected def initCleanupTimeState(stateName: String) {
if (stateCleaningEnabled) {
Expand All @@ -41,40 +43,24 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream
}
}

protected def registerProcessingCleanupTimer(
protected def processCleanupTimer(
ctx: KeyedProcessFunction[K, I, O]#Context,
currentTime: Long): Unit = {
if (stateCleaningEnabled) {

// last registered timer
val curCleanupTime = cleanupTimeState.value()

// check if a cleanup timer is registered and
// that the current cleanup timer won't delete state we need to keep
if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
// we need to register a new (later) timer
val cleanupTime = currentTime + maxRetentionTime
// register timer and remember clean-up time
ctx.timerService().registerProcessingTimeTimer(cleanupTime)
cleanupTimeState.update(cleanupTime)
}
registerProcessingCleanupTimer(
cleanupTimeState,
currentTime,
minRetentionTime,
maxRetentionTime,
ctx.timerService()
)
}
}

protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
ctx.timeDomain() == TimeDomain.PROCESSING_TIME
}

protected def needToCleanupState(timestamp: Long): Boolean = {
if (stateCleaningEnabled) {
val cleanupTime = cleanupTimeState.value()
// check that the triggered timer is the last registered processing time timer.
null != cleanupTime && timestamp == cleanupTime
} else {
false
}
}

protected def cleanupState(states: State*): Unit = {
// clear all state
states.foreach(_.clear())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ProcTimeBoundedRangeOver(

val currentTime = ctx.timerService.currentProcessingTime
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime)
processCleanupTimer(ctx, currentTime)

// buffer the event incoming event

Expand All @@ -117,11 +117,14 @@ class ProcTimeBoundedRangeOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {

if (needToCleanupState(timestamp)) {
// clean up and return
cleanupState(rowMapState, accumulatorState)
function.cleanup()
return
if (stateCleaningEnabled) {
val cleanupTime = cleanupTimeState.value()
if (null != cleanupTime && timestamp == cleanupTime) {
// clean up and return
cleanupState(rowMapState, accumulatorState)
function.cleanup()
return
}
}

// remove timestamp set outside of ProcessFunction.
Expand All @@ -131,11 +134,10 @@ class ProcTimeBoundedRangeOver(
// that have registered this time trigger 1 ms ago

val currentTime = timestamp - 1
var i = 0
// get the list of elements of current proctime
val currentElements = rowMapState.get(currentTime)

// Expired clean-up timers pass the needToCleanupState() check.
// Expired clean-up timers pass the needToCleanupState check.
// Perform a null check to verify that we have data to process.
if (null == currentElements) {
return
Expand All @@ -156,7 +158,6 @@ class ProcTimeBoundedRangeOver(
// and eliminate them. Multiple elements could have been received at the same timestamp
// the removal of old elements happens only once per proctime as onTimer is called only once
val iter = rowMapState.iterator
val markToRemove = new ArrayList[Long]()
while (iter.hasNext) {
val entry = iter.next()
val elementKey = entry.getKey
Expand All @@ -169,17 +170,9 @@ class ProcTimeBoundedRangeOver(
function.retract(accumulators, retractRow)
iRemove += 1
}
// mark element for later removal not to modify the iterator over MapState
markToRemove.add(elementKey)
iter.remove()
}
}
// need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
i = 0
while (i < markToRemove.size()) {
rowMapState.remove(markToRemove.get(i))
i += 1
}


// add current elements to aggregator. Multiple elements might
// have arrived in the same proctime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ProcTimeBoundedRowsOver(
val currentTime = ctx.timerService.currentProcessingTime

// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime)
processCleanupTimer(ctx, currentTime)

// initialize state for the processed element
var accumulators = accumulatorState.value
Expand Down Expand Up @@ -187,7 +187,7 @@ class ProcTimeBoundedRowsOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {

if (needToCleanupState(timestamp)) {
if (stateCleaningEnabled) {
cleanupState(rowMapState, accumulatorState, counterState, smallestTsState)
function.cleanup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ProcTimeUnboundedOver(
out: Collector[CRow]): Unit = {

// register state-cleanup timer
registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
processCleanupTimer(ctx, ctx.timerService().currentProcessingTime())

val input = inputC.row

Expand All @@ -95,7 +95,7 @@ class ProcTimeUnboundedOver(
ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {

if (needToCleanupState(timestamp)) {
if (stateCleaningEnabled) {
cleanupState(state)
function.cleanup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,42 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, Types}

abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryConfig)
extends ProcessFunction[IN, OUT]{
extends ProcessFunction[IN, OUT]
with CleanupState {

protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

// holds the latest registered cleanup timer
private var cleanupTimeState: ValueState[JLong] = _
protected var cleanupTimeState: ValueState[JLong] = _

protected def initCleanupTimeState(stateName: String) {
if (stateCleaningEnabled) {
val inputCntDescriptor: ValueStateDescriptor[JLong] =
val cleanupTimeDescriptor: ValueStateDescriptor[JLong] =
new ValueStateDescriptor[JLong](stateName, Types.LONG)
cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
}
}

protected def registerProcessingCleanupTimer(
protected def processCleanupTimer(
ctx: ProcessFunction[IN, OUT]#Context,
currentTime: Long): Unit = {
if (stateCleaningEnabled) {

// last registered timer
val curCleanupTime = cleanupTimeState.value()

// check if a cleanup timer is registered and
// that the current cleanup timer won't delete state we need to keep
if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
// we need to register a new (later) timer
val cleanupTime = currentTime + maxRetentionTime
// register timer and remember clean-up time
ctx.timerService().registerProcessingTimeTimer(cleanupTime)
cleanupTimeState.update(cleanupTime)
}
registerProcessingCleanupTimer(
cleanupTimeState,
currentTime,
minRetentionTime,
maxRetentionTime,
ctx.timerService()
)
}
}

protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
ctx.timeDomain() == TimeDomain.PROCESSING_TIME
}

protected def needToCleanupState(timestamp: Long): Boolean = {
if (stateCleaningEnabled) {
val cleanupTime = cleanupTimeState.value()
// check that the triggered timer is the last registered processing time timer.
null != cleanupTime && timestamp == cleanupTime
} else {
false
}
}

protected def cleanupState(states: State*): Unit = {
// clear all state
states.foreach(_.clear())
Expand Down
Loading

0 comments on commit 5716e4d

Please sign in to comment.