Skip to content

Commit

Permalink
[FLINK-7245] [table] Add operators to hold back watermarks with stati…
Browse files Browse the repository at this point in the history
…c delays.

This closes apache#4530.
  • Loading branch information
xccui authored and fhueske committed Aug 29, 2017
1 parent 1fc0b64 commit 68fdaa5
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.operators

import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
import org.apache.flink.streaming.api.watermark.Watermark

/**
* A [[KeyedCoProcessOperator]] that supports holding back watermarks with a static delay.
*/
class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
private val watermarkDelay: Long = 0L)
extends KeyedCoProcessOperator[KEY, IN1, IN2, OUT](flatMapper) {

/** emits watermark without delay */
def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)

/** emits watermark with delay */
def emitWithDelay(mark: Watermark): Unit = {
output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
}

if (watermarkDelay < 0) {
throw new IllegalArgumentException("The watermark delay should be non-negative.")
}

// choose watermark emitter
val emitter: Watermark => Unit = if (watermarkDelay == 0) {
emitWithoutDelay
} else {
emitWithDelay
}

@throws[Exception]
override def processWatermark(mark: Watermark) {
if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)

emitter(mark)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.watermark.Watermark

/**
* A [[KeyedProcessOperator]] that supports holding back watermarks with a static delay.
*/
class KeyedProcessOperatorWithWatermarkDelay[KEY, IN, OUT](
private val function: ProcessFunction[IN, OUT],
private var watermarkDelay: Long = 0L)
extends KeyedProcessOperator[KEY, IN, OUT](function) {

/** emits watermark without delay */
def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)

/** emits watermark with delay */
def emitWithDelay(mark: Watermark): Unit = {
output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
}

if (watermarkDelay < 0) {
throw new IllegalArgumentException("The watermark delay should be non-negative.")
}

// choose watermark emitter
val emitter: Watermark => Unit = if (watermarkDelay == 0) {
emitWithoutDelay
} else {
emitWithDelay
}

@throws[Exception]
override def processWatermark(mark: Watermark) {
if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)

emitter(mark)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.operators

import java.util.concurrent.ConcurrentLinkedQueue

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TestHarnessUtil}
import org.apache.flink.util.{Collector, TestLogger}
import org.junit.Test

/**
* Tests [[KeyedCoProcessOperatorWithWatermarkDelay]].
*/
class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {

@Test
def testHoldingBackWatermarks(): Unit = {
val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, Integer, String, String](
new EmptyCoProcessFunction, 100)
val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, Integer, String, String](
operator,
new IntToStringKeySelector, new CoIdentityKeySelector[String],
BasicTypeInfo.STRING_TYPE_INFO)

testHarness.setup()
testHarness.open()
testHarness.processWatermark1(new Watermark(101))
testHarness.processWatermark2(new Watermark(202))
testHarness.processWatermark1(new Watermark(103))
testHarness.processWatermark2(new Watermark(204))

val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
expectedOutput.add(new Watermark(1))
expectedOutput.add(new Watermark(3))

TestHarnessUtil.assertOutputEquals(
"Output was not correct.",
expectedOutput,
testHarness.getOutput)

testHarness.close()
}

@Test(expected = classOf[IllegalArgumentException])
def testDelayParameter(): Unit = {
new KeyedCoProcessOperatorWithWatermarkDelay[AnyRef, Integer, String, String](
new EmptyCoProcessFunction, -1)
}
}

private class EmptyCoProcessFunction extends CoProcessFunction[Integer, String, String] {
override def processElement1(
value: Integer,
ctx: CoProcessFunction[Integer, String, String]#Context,
out: Collector[String]): Unit = {
// do nothing
}

override def processElement2(
value: String,
ctx: CoProcessFunction[Integer, String, String]#Context,
out: Collector[String]): Unit = {
//do nothing
}
}


private class IntToStringKeySelector extends KeySelector[Integer, String] {
override def getKey(value: Integer): String = String.valueOf(value)
}

private class CoIdentityKeySelector[T] extends KeySelector[T, T] {
override def getKey(value: T): T = value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators

import java.util.concurrent.ConcurrentLinkedQueue

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
import org.apache.flink.util.{Collector, TestLogger}
import org.junit.Test

/**
* Tests [[KeyedProcessOperatorWithWatermarkDelay]].
*/
class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {

@Test
def testHoldingBackWatermarks(): Unit = {
val operator = new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
new EmptyProcessFunction, 100)
val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, String](
operator, new IdentityKeySelector, BasicTypeInfo.INT_TYPE_INFO)

testHarness.setup()
testHarness.open()
testHarness.processWatermark(new Watermark(101))
testHarness.processWatermark(new Watermark(103))

val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
expectedOutput.add(new Watermark(1))
expectedOutput.add(new Watermark(3))

TestHarnessUtil.assertOutputEquals(
"Output was not correct.",
expectedOutput,
testHarness.getOutput)

testHarness.close()
}

@Test(expected = classOf[IllegalArgumentException])
def testDelayParameter(): Unit = {
new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
new EmptyProcessFunction, -1)
}
}

private class EmptyProcessFunction extends ProcessFunction[Integer, String] {
override def processElement(
value: Integer,
ctx: ProcessFunction[Integer, String]#Context,
out: Collector[String]): Unit = {
// do nothing
}
}

private class IdentityKeySelector[T] extends KeySelector[T, T] {
override def getKey(value: T): T = value
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public abstract class AbstractStreamOperator<OUT>

// ---------------- time handler ------------------

private transient InternalTimeServiceManager<?, ?> timeServiceManager;
protected transient InternalTimeServiceManager<?, ?> timeServiceManager;

// ---------------- two-input operator watermarks ------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* @param <N> The type of namespace used for the timers.
*/
@Internal
class InternalTimeServiceManager<K, N> {
public class InternalTimeServiceManager<K, N> {

private final int totalKeyGroups;
private final KeyGroupsList localKeyGroupRange;
Expand Down

0 comments on commit 68fdaa5

Please sign in to comment.