Skip to content

Commit

Permalink
[FLINK-10596][cep, docs] Added description of TimeContext to docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jan 9, 2019
1 parent 0a46510 commit 47e9ccf
Showing 1 changed file with 39 additions and 2 deletions.
41 changes: 39 additions & 2 deletions docs/dev/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ middle.oneOrMore()
previously accepted events for a given potential match. The cost of this operation can vary, so when implementing
your condition, try to minimize its use.

Described context gives one access to event time characteristics as well. For more info see [Time context](#time-context).

**Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides
whether to accept an event or not, based *only* on properties of the event itself.

Expand Down Expand Up @@ -1508,6 +1510,7 @@ class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT>

The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
For more info see [Time context](#time-context).
Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).


Expand Down Expand Up @@ -1594,7 +1597,9 @@ val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
</div>
</div>

## Handling Lateness in Event Time
## Time in CEP library

### Handling Lateness in Event Time

In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.

Expand All @@ -1620,7 +1625,6 @@ SingleOutputStreamOperator<ComplexEvent> result = patternStream

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);


{% endhighlight %}

</div>
Expand All @@ -1646,6 +1650,39 @@ val lateData: DataStream<String> = result.getSideOutput(lateDataOutputTag)
</div>
</div>

### Time context

In [PatternProcessFunction](#selecting-from-patterns) as well as in [IterativeCondition](#conditions) user has access to a context
that implements `TimeContext` as follows:

{% highlight java %}
/**
* Enables access to time related characteristics such as current processing time or timestamp of
* currently processed element. Used in {@link PatternProcessFunction} and
* {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
*/
@PublicEvolving
public interface TimeContext {

/**
* Timestamp of the element currently being processed.
*
* <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
* will be set to the time when event entered the cep operator.
*/
long timestamp();

/** Returns the current processing time. */
long currentProcessingTime();
}
{% endhighlight %}

This context gives user access to time characteristics of processed events (incoming records in case of `IterativeCondition` and matches in case of `PatternProcessFunction`).
Call to `TimeContext#currentProcessingTime` always gives you the value of current processing time and this call should be preferred to e.g. calling `System.currentTimeMillis()`.

In case of `TimeContext#timestamp()` the returned value is equal to assigned timestamp in case of `EventTime`. In `ProcessingTime` this will equal to the point of time when said event entered
cep operator (or when the match was generated in case of `PatternProcessFunction`). This means that the value will be consistent across multiple calls to that method.

## Examples

The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data
Expand Down

0 comments on commit 47e9ccf

Please sign in to comment.