From 47e9ccf0d3686cfe3e3ffb8f38e606b0f3d0fe0e Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 8 Jan 2019 12:09:15 +0100 Subject: [PATCH] [FLINK-10596][cep, docs] Added description of TimeContext to docs --- docs/dev/libs/cep.md | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index ab44b5c99da01..a358ba2bc1e0e 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -311,6 +311,8 @@ middle.oneOrMore() previously accepted events for a given potential match. The cost of this operation can vary, so when implementing your condition, try to minimize its use. +Described context gives one access to event time characteristics as well. For more info see [Time context](#time-context). + **Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides whether to accept an event or not, based *only* on properties of the event itself. @@ -1508,6 +1510,7 @@ class MyPatternProcessFunction extends PatternProcessFunction The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match). +For more info see [Time context](#time-context). Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html). @@ -1594,7 +1597,9 @@ val timeoutResult: DataStream = result.getSideOutput(outputTag) -## Handling Lateness in Event Time +## Time in CEP library + +### Handling Lateness in Event Time In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order. @@ -1620,7 +1625,6 @@ SingleOutputStreamOperator result = patternStream DataStream lateData = result.getSideOutput(lateDataOutputTag); - {% endhighlight %} @@ -1646,6 +1650,39 @@ val lateData: DataStream = result.getSideOutput(lateDataOutputTag) +### Time context + +In [PatternProcessFunction](#selecting-from-patterns) as well as in [IterativeCondition](#conditions) user has access to a context +that implements `TimeContext` as follows: + +{% highlight java %} +/** + * Enables access to time related characteristics such as current processing time or timestamp of + * currently processed element. Used in {@link PatternProcessFunction} and + * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition} + */ +@PublicEvolving +public interface TimeContext { + + /** + * Timestamp of the element currently being processed. + * + *

In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this + * will be set to the time when event entered the cep operator. + */ + long timestamp(); + + /** Returns the current processing time. */ + long currentProcessingTime(); +} +{% endhighlight %} + +This context gives user access to time characteristics of processed events (incoming records in case of `IterativeCondition` and matches in case of `PatternProcessFunction`). +Call to `TimeContext#currentProcessingTime` always gives you the value of current processing time and this call should be preferred to e.g. calling `System.currentTimeMillis()`. + +In case of `TimeContext#timestamp()` the returned value is equal to assigned timestamp in case of `EventTime`. In `ProcessingTime` this will equal to the point of time when said event entered +cep operator (or when the match was generated in case of `PatternProcessFunction`). This means that the value will be consistent across multiple calls to that method. + ## Examples The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data