diff --git a/docs/concepts/timely-stream-processing.md b/docs/concepts/timely-stream-processing.md index 4953b792a02ab..0040687515b87 100644 --- a/docs/concepts/timely-stream-processing.md +++ b/docs/concepts/timely-stream-processing.md @@ -27,75 +27,207 @@ under the License. * This will be replaced by the TOC {:toc} -## Time +## Latency & Completeness -When referring to time in a streaming program (for example to define windows), -one can refer to different notions of time: +### Latency vs. Completeness in Batch & Stream Processing - - **Event Time** is the time when an event was created. It is usually - described by a timestamp in the events, for example attached by the - producing sensor, or the producing service. Flink accesses event timestamps - via [timestamp assigners]({{ site.baseurl - }}/dev/event_timestamps_watermarks.html). +{% top %} - - **Ingestion time** is the time when an event enters the Flink dataflow at - the source operator. +## Event Time, Processing Time, and Ingestion Time - - **Processing Time** is the local time at each operator that performs a - time-based operation. +When referring to time in a streaming program (for example to define windows), +one can refer to different notions of *time*: + +- **Processing time:** Processing time refers to the system time of the machine + that is executing the respective operation. + + When a streaming program runs on processing time, all time-based operations + (like time windows) will use the system clock of the machines that run the + respective operator. An hourly processing time window will include all + records that arrived at a specific operator between the times when the system + clock indicated the full hour. For example, if an application begins running + at 9:15am, the first hourly processing time window will include events + processed between 9:15am and 10:00am, the next window will include events + processed between 10:00am and 11:00am, and so on. + + Processing time is the simplest notion of time and requires no coordination + between streams and machines. It provides the best performance and the + lowest latency. However, in distributed and asynchronous environments + processing time does not provide determinism, because it is susceptible to + the speed at which records arrive in the system (for example from the message + queue), to the speed at which the records flow between operators inside the + system, and to outages (scheduled, or otherwise). + +- **Event time:** Event time is the time that each individual event occurred on + its producing device. This time is typically embedded within the records + before they enter Flink, and that *event timestamp* can be extracted from + each record. In event time, the progress of time depends on the data, not on + any wall clocks. Event time programs must specify how to generate *Event Time + Watermarks*, which is the mechanism that signals progress in event time. This + watermarking mechanism is described in a later section, + [below](#event-time-and-watermarks). + + In a perfect world, event time processing would yield completely consistent + and deterministic results, regardless of when events arrive, or their + ordering. However, unless the events are known to arrive in-order (by + timestamp), event time processing incurs some latency while waiting for + out-of-order events. As it is only possible to wait for a finite period of + time, this places a limit on how deterministic event time applications can + be. + + Assuming all of the data has arrived, event time operations will behave as + expected, and produce correct and consistent results even when working with + out-of-order or late events, or when reprocessing historic data. For example, + an hourly event time window will contain all records that carry an event + timestamp that falls into that hour, regardless of the order in which they + arrive, or when they are processed. (See the section on [late + events](#late-elements) for more information.) + + + + Note that sometimes when event time programs are processing live data in + real-time, they will use some *processing time* operations in order to + guarantee that they are progressing in a timely fashion. + +- **Ingestion time:** Ingestion time is the time that events enter Flink. At + the source operator each record gets the source's current time as a + timestamp, and time-based operations (like time windows) refer to that + timestamp. + + *Ingestion time* sits conceptually in between *event time* and *processing + time*. Compared to *processing time*, it is slightly more expensive, but + gives more predictable results. Because *ingestion time* uses stable + timestamps (assigned once at the source), different window operations over + the records will refer to the same timestamp, whereas in *processing time* + each window operator may assign the record to a different window (based on + the local system clock and any transport delay). + + Compared to *event time*, *ingestion time* programs cannot handle any + out-of-order events or late data, but the programs don't have to specify how + to generate *watermarks*. + + Internally, *ingestion time* is treated much like *event time*, but with + automatic timestamp assignment and automatic watermark generation. Event Time, Ingestion Time, and Processing Time -More details on how to handle time are in the [event time docs]({{ site.baseurl -}}{% link dev/event_time.md %}). - {% top %} -## Windows +## Event Time and Watermarks -Aggregating events (e.g., counts, sums) works differently on streams than in -batch processing. For example, it is impossible to count all elements in a -stream, because streams are in general infinite (unbounded). Instead, -aggregates on streams (counts, sums, etc), are scoped by **windows**, such as -*"count over the last 5 minutes"*, or *"sum of the last 100 elements"*. +*Note: Flink implements many techniques from the Dataflow Model. For a good +introduction to event time and watermarks, have a look at the articles below.* -Windows can be *time driven* (example: every 30 seconds) or *data driven* -(example: every 100 elements). One typically distinguishes different types of -windows, such as *tumbling windows* (no overlap), *sliding windows* (with -overlap), and *session windows* (punctuated by a gap of inactivity). + - [Streaming + 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by + Tyler Akidau + - The [Dataflow Model + paper](https://research.google.com/pubs/archive/43864.pdf) -Time- and Count Windows -More window examples can be found in this [blog -post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html). More -details are in the [window docs]({{ site.baseurl }}{% link -dev/stream/operators/windows.md %}). +A stream processor that supports *event time* needs a way to measure the +progress of event time. For example, a window operator that builds hourly +windows needs to be notified when event time has passed beyond the end of an +hour, so that the operator can close the window in progress. -{% top %} +*Event time* can progress independently of *processing time* (measured by wall +clocks). For example, in one program the current *event time* of an operator +may trail slightly behind the *processing time* (accounting for a delay in +receiving the events), while both proceed at the same speed. On the other +hand, another streaming program might progress through weeks of event time with +only a few seconds of processing, by fast-forwarding through some historic data +already buffered in a Kafka topic (or another message queue). -## Latency & Completeness +------ -### Latency vs. Completeness in Batch & Stream Processing +The mechanism in Flink to measure progress in event time is **watermarks**. +Watermarks flow as part of the data stream and carry a timestamp *t*. A +*Watermark(t)* declares that event time has reached time *t* in that stream, +meaning that there should be no more elements from the stream with a timestamp +*t' <= t* (i.e. events with timestamps older or equal to the watermark). -{% top %} +The figure below shows a stream of events with (logical) timestamps, and +watermarks flowing inline. In this example the events are in order (with +respect to their timestamps), meaning that the watermarks are simply periodic +markers in the stream. -## Notions of Time +A data stream with events (in order) and watermarks -### Event Time +Watermarks are crucial for *out-of-order* streams, as illustrated below, where +the events are not ordered by their timestamps. In general a watermark is a +declaration that by that point in the stream, all events up to a certain +timestamp should have arrived. Once a watermark reaches an operator, the +operator can advance its internal *event time clock* to the value of the +watermark. -`Different Sources of Time (Event, Ingestion, Storage)` +A data stream with events (out of order) and watermarks -### Processing Time +Note that event time is inherited by a freshly created stream element (or +elements) from either the event that produced them or from watermark that +triggered creation of those elements. -{% top %} +### Watermarks in Parallel Streams -## Making Progress: Watermarks, Processing Time, Lateness +Watermarks are generated at, or directly after, source functions. Each parallel +subtask of a source function usually generates its watermarks independently. +These watermarks define the event time at that particular parallel source. -### Propagation of watermarks +As the watermarks flow through the streaming program, they advance the event +time at the operators where they arrive. Whenever an operator advances its +event time, it generates a new watermark downstream for its successor +operators. -{% top %} +Some operators consume multiple input streams; a union, for example, or +operators following a *keyBy(...)* or *partition(...)* function. Such an +operator's current event time is the minimum of its input streams' event times. +As its input streams update their event times, so does the operator. + +The figure below shows an example of events and watermarks flowing through +parallel streams, and operators tracking event time. + +Parallel data streams and operators with events and watermarks + +Note that the Kafka source supports per-partition watermarking, which you can +read more about [here]({{ site.baseurl }}{% link +dev/event_timestamps_watermarks.md %}#timestamps-per-kafka-partition). + + +## Lateness + +It is possible that certain elements will violate the watermark condition, +meaning that even after the *Watermark(t)* has occurred, more elements with +timestamp *t' <= t* will occur. In fact, in many real world setups, certain +elements can be arbitrarily delayed, making it impossible to specify a time by +which all elements of a certain event timestamp will have occurred. +Furthermore, even if the lateness can be bounded, delaying the watermarks by +too much is often not desirable, because it causes too much delay in the +evaluation of event time windows. + +For this reason, streaming programs may explicitly expect some *late* elements. +Late elements are elements that arrive after the system's event time clock (as +signaled by the watermarks) has already passed the time of the late element's +timestamp. See [Allowed Lateness]({{ site.baseurl }}{% link +dev/stream/operators/windows.md %}#allowed-lateness) for more information on +how to work with late elements in event time windows. ## Windowing -{% top %} +Aggregating events (e.g., counts, sums) works differently on streams than in +batch processing. For example, it is impossible to count all elements in a +stream, because streams are in general infinite (unbounded). Instead, +aggregates on streams (counts, sums, etc), are scoped by **windows**, such as +*"count over the last 5 minutes"*, or *"sum of the last 100 elements"*. +Windows can be *time driven* (example: every 30 seconds) or *data driven* +(example: every 100 elements). One typically distinguishes different types of +windows, such as *tumbling windows* (no overlap), *sliding windows* (with +overlap), and *session windows* (punctuated by a gap of inactivity). + +Time- and Count Windows + +More window examples can be found in this [blog +post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html). More +details are in the [window docs]({{ site.baseurl }}{% link +dev/stream/operators/windows.md %}). + +{% top %} diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md index fcd5a14ebb803..9137a3fd11d39 100644 --- a/docs/dev/event_time.md +++ b/docs/dev/event_time.md @@ -24,65 +24,14 @@ specific language governing permissions and limitations under the License. --> +In this section you will learn about writing time-aware Flink programs. Please +take a look at [Timely Stream Processing]({{site.baseurl}}{% link +concepts/timely-stream-processing.md %}) to learn about the concepts behind +timely stream processing. + * toc {:toc} -# Event Time / Processing Time / Ingestion Time - -Flink supports different notions of *time* in streaming programs. - -- **Processing time:** Processing time refers to the system time of the machine that is executing the - respective operation. - - When a streaming program runs on processing time, all time-based operations (like time windows) will - use the system clock of the machines that run the respective operator. An hourly - processing time window will include all records that arrived at a specific operator between the - times when the system clock indicated the full hour. For example, if an application - begins running at 9:15am, the first hourly processing time window will include events - processed between 9:15am and 10:00am, the next window will include events processed between 10:00am and 11:00am, and so on. - - Processing time is the simplest notion of time and requires no coordination between streams and machines. - It provides the best performance and the lowest latency. However, in distributed and asynchronous - environments processing time does not provide determinism, because it is susceptible to the speed at which - records arrive in the system (for example from the message queue), to the speed at which the - records flow between operators inside the system, and to outages (scheduled, or otherwise). - -- **Event time:** Event time is the time that each individual event occurred on its producing device. - This time is typically embedded within the records before they enter Flink, and that *event timestamp* - can be extracted from each record. In event time, the progress of time depends on the data, - not on any wall clocks. Event time programs must specify how to generate *Event Time Watermarks*, - which is the mechanism that signals progress in event time. This watermarking mechanism is - described in a later section, [below](#event-time-and-watermarks). - - In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. - However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be. - - Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records - that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on [late events](#late-elements) for more information.) - - - - Note that sometimes when event time programs are processing live data in real-time, they will use some *processing time* operations in order to guarantee that they are progressing in a timely fashion. - -- **Ingestion time:** Ingestion time is the time that events enter Flink. At the source operator each - record gets the source's current time as a timestamp, and time-based operations (like time windows) - refer to that timestamp. - - *Ingestion time* sits conceptually in between *event time* and *processing time*. Compared to - *processing time*, it is slightly more expensive, but gives more predictable results. Because - *ingestion time* uses stable timestamps (assigned once at the source), different window operations - over the records will refer to the same timestamp, whereas in *processing time* each window operator - may assign the record to a different window (based on the local system clock and any transport delay). - - Compared to *event time*, *ingestion time* programs cannot handle any out-of-order events or late data, - but the programs don't have to specify how to generate *watermarks*. - - Internally, *ingestion time* is treated much like *event time*, but with automatic timestamp assignment and - automatic watermark generation. - - - - ### Setting a Time Characteristic The first part of a Flink DataStream program usually sets the base *time characteristic*. That setting @@ -144,7 +93,6 @@ env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) - Note that in order to run this example in *event time*, the program needs to either use sources that directly define event time for the data and emit watermarks themselves, or the program must inject a *Timestamp Assigner & Watermark Generator* after the sources. Those functions describe how to access @@ -154,77 +102,7 @@ The section below describes the general mechanism behind *timestamps* and *water to use timestamp assignment and watermark generation in the Flink DataStream API, please refer to [Generating Timestamps / Watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html). - -# Event Time and Watermarks - -*Note: Flink implements many techniques from the Dataflow Model. For a good introduction to event time and watermarks, have a look at the articles below.* - - - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau - - The [Dataflow Model paper](https://research.google.com/pubs/archive/43864.pdf) - - -A stream processor that supports *event time* needs a way to measure the progress of event time. -For example, a window operator that builds hourly windows needs to be notified when event time has passed beyond the -end of an hour, so that the operator can close the window in progress. - -*Event time* can progress independently of *processing time* (measured by wall clocks). -For example, in one program the current *event time* of an operator may trail slightly behind the *processing time* -(accounting for a delay in receiving the events), while both proceed at the same speed. -On the other hand, another streaming program might progress through weeks of event time with only a few seconds of processing, -by fast-forwarding through some historic data already buffered in a Kafka topic (or another message queue). - ------- - -The mechanism in Flink to measure progress in event time is **watermarks**. -Watermarks flow as part of the data stream and carry a timestamp *t*. A *Watermark(t)* declares that event time has reached time -*t* in that stream, meaning that there should be no more elements from the stream with a timestamp *t' <= t* (i.e. events with timestamps -older or equal to the watermark). - -The figure below shows a stream of events with (logical) timestamps, and watermarks flowing inline. In this example the events are in order -(with respect to their timestamps), meaning that the watermarks are simply periodic markers in the stream. - -A data stream with events (in order) and watermarks - -Watermarks are crucial for *out-of-order* streams, as illustrated below, where the events are not ordered by their timestamps. -In general a watermark is a declaration that by that point in the stream, all events up to a certain timestamp should have arrived. -Once a watermark reaches an operator, the operator can advance its internal *event time clock* to the value of the watermark. - -A data stream with events (out of order) and watermarks - -Note that event time is inherited by a freshly created stream element (or elements) from either the event that produced them or -from watermark that triggered creation of those elements. - -## Watermarks in Parallel Streams - -Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually -generates its watermarks independently. These watermarks define the event time at that particular parallel source. - -As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an -operator advances its event time, it generates a new watermark downstream for its successor operators. - -Some operators consume multiple input streams; a union, for example, or operators following a *keyBy(...)* or *partition(...)* function. -Such an operator's current event time is the minimum of its input streams' event times. As its input streams -update their event times, so does the operator. - -The figure below shows an example of events and watermarks flowing through parallel streams, and operators tracking event time. - -Parallel data streams and operators with events and watermarks - -Note that the Kafka source supports per-partition watermarking, which you can read more about [here]({{ site.baseurl }}/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition). - - -## Late Elements - -It is possible that certain elements will violate the watermark condition, meaning that even after the *Watermark(t)* has occurred, -more elements with timestamp *t' <= t* will occur. In fact, in many real world setups, certain elements can be arbitrarily -delayed, making it impossible to specify a time by which all elements of a certain event timestamp will have occurred. -Furthermore, even if the lateness can be bounded, delaying the watermarks by too much is often not desirable, because it -causes too much delay in the evaluation of event time windows. - -For this reason, streaming programs may explicitly expect some *late* elements. Late elements are elements that -arrive after the system's event time clock (as signaled by the watermarks) has already passed the time of the late element's -timestamp. See [Allowed Lateness]({{ site.baseurl }}/dev/stream/operators/windows.html#allowed-lateness) for more information on how to work -with late elements in event time windows. +{% top %} ## Idling sources diff --git a/docs/fig/times_clocks.svg b/docs/fig/times_clocks.svg deleted file mode 100644 index 2dede77501c6c..0000000000000 --- a/docs/fig/times_clocks.svg +++ /dev/null @@ -1,368 +0,0 @@ - - - - - - - - - - image/svg+xml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Event Producer - Message Queue - - Flink - Data Source - Flink - Window Operator - - - - - - - - - - - - - - - partition 1 - partition 2 - - - - - - - - - - - - - - Event - Time - Ingestion - Time - Window - Processing - Time - - - - - - -