Skip to content

Commit

Permalink
[FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side…
Browse files Browse the repository at this point in the history
… output.

With this, the CEP library assumes correctness of the watermark
and considers as late, events that arrive having a timestamp
smaller than that of the last seen watermark. Late events are not
silently dropped, but the user can specify to send them to a side
output.
  • Loading branch information
kl0u committed Mar 31, 2017
1 parent 1932240 commit 4889028
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 34 deletions.
53 changes: 53 additions & 0 deletions docs/dev/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,59 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
</div>
</div>

### Handling Lateness in Event Time

In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order
when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending
order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller
than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.

<span class="label label-danger">Attention</span> The library assumes correctness of the watermark when working
in event time.

To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes
*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
seen watermark. Late elements are not further processed but they can be redirected to a [side output]
({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them.

To access the stream of late elements, you first need to specify that you want to get the late data using
`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
so, the late elements will be silently dropped. Then, you can get the side-output stream using the
`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in
the `.withLateDataOutputTag(OutputTag)`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

PatternStream<T> patternStream = CEP.pattern(...)
.withLateDataOutputTag(lateOutputTag);

// main output with matches
DataStream<O> result = patternStream.select(...)

// side output containing the late events
DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val lateOutputTag = OutputTag[T]("late-data")

val patternStream: PatternStream[T] = CEP.pattern(...)
.withLateDataOutputTag(lateOutputTag)

// main output with matches
val result = patternStream.select(...)

// side output containing the late events
val lateStream = patternStream.getSideOutput(lateOutputTag)
{% endhighlight %}
</div>
</div>

## Examples

The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import java.util.{Map => JMap}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.pattern.{Pattern => JPattern}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
import org.apache.flink.util.{Collector, OutputTag}
import org.apache.flink.types.{Either => FEither}
import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
import java.lang.{Long => JLong}

import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern

Expand All @@ -45,8 +46,23 @@ import scala.collection.mutable
*/
class PatternStream[T](jPatternStream: JPatternStream[T]) {

private[flink] var lateDataOutputTag: OutputTag[T] = null

private[flink] def wrappedPatternStream = jPatternStream


/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. The
* CEP library assumes correctness of the watermark, so an element is considered late if its
* timestamp is smaller than the last received watermark.
*/
@PublicEvolving
def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = {
jPatternStream.withLateDataOutputTag(outputTag)
lateDataOutputTag = outputTag
this
}

def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])

def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
Expand Down Expand Up @@ -93,7 +109,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {

val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern())
jPatternStream.getPattern(),
lateDataOutputTag)

val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
Expand Down Expand Up @@ -158,7 +175,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
: DataStream[Either[L, R]] = {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern()
jPatternStream.getPattern(),
lateDataOutputTag
)

val cleanedSelect = cleanClosure(patternFlatSelectFunction)
Expand Down Expand Up @@ -317,6 +335,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {

flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
}

/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @param tag The tag identifying a specific side output.
*/
@PublicEvolving
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
asScalaStream(jPatternStream.getSideOutput(tag))
}
}

object PatternStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.flink.cep.operator.CEPOperatorUtils;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import java.util.Map;

Expand All @@ -50,6 +53,19 @@ public class PatternStream<T> {

private final Pattern<T, ?> pattern;

/**
* A reference to the created pattern stream used to get
* the registered side outputs, e.g late elements side output.
*/
private SingleOutputStreamOperator<?> patternStream;

/**
* {@link OutputTag} to use for late arriving events. Elements for which
* {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
* be emitted to this.
*/
private OutputTag<T> lateDataOutputTag;

PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
Expand All @@ -63,6 +79,22 @@ public DataStream<T> getInputStream() {
return inputStream;
}

/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. The
* CEP library assumes correctness of the watermark, so an element is considered late if its
* timestamp is smaller than the last received watermark.
*/
public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) {
Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
Preconditions.checkArgument(lateDataOutputTag == null,
"The late side output tag has already been initialized to " + lateDataOutputTag + ".");
Preconditions.checkArgument(patternStream == null,
"The late side output tag has to be set before calling select() or flatSelect().");

this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
return this;
}

/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
Expand All @@ -74,7 +106,7 @@ public DataStream<T> getInputStream() {
* @return {@link DataStream} which contains the resulting elements from the pattern select
* function.
*/
public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
// we have to extract the output type from the provided pattern selection function manually
// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction

Expand Down Expand Up @@ -102,8 +134,10 @@ public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectF
* @return {@link DataStream} which contains the resulting elements from the pattern select
* function.
*/
public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, T>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;

return patternStream.map(
new PatternSelectMapper<>(
Expand All @@ -129,11 +163,13 @@ public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectF
* @return {@link DataStream} which contains the resulting elements or the resulting timeout
* elements wrapped in an {@link Either} type.
*/
public <L, R> DataStream<Either<L, R>> select(
public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
final PatternTimeoutFunction<T, L> patternTimeoutFunction,
final PatternSelectFunction<T, R> patternSelectFunction) {

DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
Expand Down Expand Up @@ -174,7 +210,7 @@ public <L, R> DataStream<Either<L, R>> select(
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
// we have to extract the output type from the provided pattern selection function manually
// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
Expand All @@ -201,8 +237,10 @@ public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patter
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, T>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;

return patternStream.flatMap(
new PatternFlatSelectMapper<>(
Expand All @@ -229,11 +267,13 @@ public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patter
* function or the resulting timeout events from the pattern flat timeout function wrapped in an
* {@link Either} type.
*/
public <L, R> DataStream<Either<L, R>> flatSelect(
public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {

DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
Expand Down Expand Up @@ -263,6 +303,18 @@ public <L, R> DataStream<Either<L, R>> flatSelect(
).returns(outTypeInfo);
}

/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @param sideOutputTag The tag identifying a specific side output.
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
"To have the late element side output, you have to first define the main output using select() or flatSelect().");
return patternStream.getSideOutput(sideOutputTag);
}

/**
* Wrapper for a {@link PatternSelectFunction}.
*
Expand Down
Loading

0 comments on commit 4889028

Please sign in to comment.