From 96c60ffaf0e4e1df0578396ec91095855ac30e82 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 22 Apr 2020 14:36:42 +0200 Subject: [PATCH] [hotfix][docs] copy concepts/*stream-processing over to zh docs --- .../concepts/stateful-stream-processing.zh.md | 340 ++++++++++++++++++ docs/concepts/timely-stream-processing.zh.md | 211 +++++++++++ 2 files changed, 551 insertions(+) create mode 100644 docs/concepts/stateful-stream-processing.zh.md create mode 100644 docs/concepts/timely-stream-processing.zh.md diff --git a/docs/concepts/stateful-stream-processing.zh.md b/docs/concepts/stateful-stream-processing.zh.md new file mode 100644 index 0000000000000..a7cefa99e99ea --- /dev/null +++ b/docs/concepts/stateful-stream-processing.zh.md @@ -0,0 +1,340 @@ +--- +title: Stateful Stream Processing +nav-id: stateful-stream-processing +nav-pos: 2 +nav-title: Stateful Stream Processing +nav-parent_id: concepts +--- + + +While many operations in a dataflow simply look at one individual *event at a +time* (for example an event parser), some operations remember information +across multiple events (for example window operators). These operations are +called **stateful**. + +Some examples of stateful operations: + + - When an application searches for certain event patterns, the state will + store the sequence of events encountered so far. + - When aggregating events per minute/hour/day, the state holds the pending + aggregates. + - When training a machine learning model over a stream of data points, the + state holds the current version of the model parameters. + - When historic data needs to be managed, the state allows efficient access + to events that occurred in the past. + +Flink needs to be aware of the state in order to make it fault tolerant using +[checkpoints]({{ site.baseurl}}{% link dev/stream/state/checkpointing.zh.md %}) +and [savepoints]({{ site.baseurl }}{%link ops/state/savepoints.zh.md %}). + +Knowledge about the state also allows for rescaling Flink applications, meaning +that Flink takes care of redistributing state across parallel instances. + +[Queryable state]({{ site.baseurl }}{% link dev/stream/state/queryable_state.zh.md +%}) allows you to access state from outside of Flink during runtime. + +When working with state, it might also be useful to read about [Flink's state +backends]({{ site.baseurl }}{% link ops/state/state_backends.zh.md %}). Flink +provides different state backends that specify how and where state is stored. + +* This will be replaced by the TOC +{:toc} + +## What is State? + +`TODO: expand this section` + +{% top %} + +## State in Stream & Batch Processing + +`TODO: What is this section about? Do we even need it?` + +{% top %} + +## Keyed State + +Keyed state is maintained in what can be thought of as an embedded key/value +store. The state is partitioned and distributed strictly together with the +streams that are read by the stateful operators. Hence, access to the key/value +state is only possible on *keyed streams*, i.e. after a keyed/partitioned data +exchange, and is restricted to the values associated with the current event's +key. Aligning the keys of streams and state makes sure that all state updates +are local operations, guaranteeing consistency without transaction overhead. +This alignment also allows Flink to redistribute the state and adjust the +stream partitioning transparently. + +State and Partitioning + +Keyed State is further organized into so-called *Key Groups*. Key Groups are +the atomic unit by which Flink can redistribute Keyed State; there are exactly +as many Key Groups as the defined maximum parallelism. During execution each +parallel instance of a keyed operator works with the keys for one or more Key +Groups. + +## State Persistence + +Flink implements fault tolerance using a combination of **stream replay** and +**checkpointing**. A checkpoint marks a specific point in each of the +input streams along with the corresponding state for each of the operators. A +streaming dataflow can be resumed from a checkpoint while maintaining +consistency *(exactly-once processing semantics)* by restoring the state of the +operators and replaying the records from the point of the checkpoint. + +The checkpoint interval is a means of trading off the overhead of fault +tolerance during execution with the recovery time (the number of records that +need to be replayed). + +The fault tolerance mechanism continuously draws snapshots of the distributed +streaming data flow. For streaming applications with small state, these +snapshots are very light-weight and can be drawn frequently without much impact +on performance. The state of the streaming applications is stored at a +configurable place, usually in a distributed file system. + +In case of a program failure (due to machine-, network-, or software failure), +Flink stops the distributed streaming dataflow. The system then restarts the +operators and resets them to the latest successful checkpoint. The input +streams are reset to the point of the state snapshot. Any records that are +processed as part of the restarted parallel dataflow are guaranteed to not have +affected the previously checkpointed state. + +{% info Note %} By default, checkpointing is disabled. See [Checkpointing]({{ +site.baseurl }}{% link dev/stream/state/checkpointing.zh.md %}) for details on how +to enable and configure checkpointing. + +{% info Note %} For this mechanism to realize its full guarantees, the data +stream source (such as message queue or broker) needs to be able to rewind the +stream to a defined recent point. [Apache Kafka](http://kafka.apache.org) has +this ability and Flink's connector to Kafka exploits this. See [Fault +Tolerance Guarantees of Data Sources and Sinks]({{ site.baseurl }}{% link +dev/connectors/guarantees.zh.md %}) for more information about the guarantees +provided by Flink's connectors. + +{% info Note %} Because Flink's checkpoints are realized through distributed +snapshots, we use the words *snapshot* and *checkpoint* interchangeably. Often +we also use the term *snapshot* to mean either *checkpoint* or *savepoint*. + +### Checkpointing + +The central part of Flink's fault tolerance mechanism is drawing consistent +snapshots of the distributed data stream and operator state. These snapshots +act as consistent checkpoints to which the system can fall back in case of a +failure. Flink's mechanism for drawing these snapshots is described in +"[Lightweight Asynchronous Snapshots for Distributed +Dataflows](http://arxiv.org/abs/1506.08603)". It is inspired by the standard +[Chandy-Lamport +algorithm](http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf) +for distributed snapshots and is specifically tailored to Flink's execution +model. + +Keep in mind that everything to do with checkpointing can be done +asynchronously. The checkpoint barriers don't travel in lock step and +operations can asynchronously snapshot their state. + + +#### Barriers + +A core element in Flink's distributed snapshotting are the *stream barriers*. +These barriers are injected into the data stream and flow with the records as +part of the data stream. Barriers never overtake records, they flow strictly in +line. A barrier separates the records in the data stream into the set of +records that goes into the current snapshot, and the records that go into the +next snapshot. Each barrier carries the ID of the snapshot whose records it +pushed in front of it. Barriers do not interrupt the flow of the stream and are +hence very lightweight. Multiple barriers from different snapshots can be in +the stream at the same time, which means that various snapshots may happen +concurrently. + +
+ Checkpoint barriers in data streams +
+ +Stream barriers are injected into the parallel data flow at the stream sources. +The point where the barriers for snapshot *n* are injected (let's call it +Sn) is the position in the source stream up to which the +snapshot covers the data. For example, in Apache Kafka, this position would be +the last record's offset in the partition. This position Sn +is reported to the *checkpoint coordinator* (Flink's JobManager). + +The barriers then flow downstream. When an intermediate operator has received a +barrier for snapshot *n* from all of its input streams, it emits a barrier for +snapshot *n* into all of its outgoing streams. Once a sink operator (the end of +a streaming DAG) has received the barrier *n* from all of its input streams, it +acknowledges that snapshot *n* to the checkpoint coordinator. After all sinks +have acknowledged a snapshot, it is considered completed. + +Once snapshot *n* has been completed, the job will never again ask the source +for records from before Sn, since at that point these records +(and their descendant records) will have passed through the entire data flow +topology. + +
+ Aligning data streams at operators with multiple inputs +
+ +Operators that receive more than one input stream need to *align* the input +streams on the snapshot barriers. The figure above illustrates this: + + - As soon as the operator receives snapshot barrier *n* from an incoming + stream, it cannot process any further records from that stream until it has + received the barrier *n* from the other inputs as well. Otherwise, it would + mix records that belong to snapshot *n* and with records that belong to + snapshot *n+1*. + - Streams that report barrier *n* are temporarily set aside. Records that are + received from these streams are not processed, but put into an input + buffer. + - Once the last stream has received barrier *n*, the operator emits all + pending outgoing records, and then emits snapshot *n* barriers itself. + - After that, it resumes processing records from all input streams, + processing records from the input buffers before processing the records + from the streams. + +#### Snapshotting Operator State + +When operators contain any form of *state*, this state must be part of the +snapshots as well. + +Operators snapshot their state at the point in time when they have received all +snapshot barriers from their input streams, and before emitting the barriers to +their output streams. At that point, all updates to the state from records +before the barriers will have been made, and no updates that depend on records +from after the barriers have been applied. Because the state of a snapshot may +be large, it is stored in a configurable *[state backend]({{ site.baseurl }}{% +link ops/state/state_backends.zh.md %})*. By default, this is the JobManager's +memory, but for production use a distributed reliable storage should be +configured (such as HDFS). After the state has been stored, the operator +acknowledges the checkpoint, emits the snapshot barrier into the output +streams, and proceeds. + +The resulting snapshot now contains: + + - For each parallel stream data source, the offset/position in the stream + when the snapshot was started + - For each operator, a pointer to the state that was stored as part of the + snapshot + +
+ Illustration of the Checkpointing Mechanism +
+ +#### Recovery + +Recovery under this mechanism is straightforward: Upon a failure, Flink selects +the latest completed checkpoint *k*. The system then re-deploys the entire +distributed dataflow, and gives each operator the state that was snapshotted as +part of checkpoint *k*. The sources are set to start reading the stream from +position Sk. For example in Apache Kafka, that means telling +the consumer to start fetching from offset Sk. + +If state was snapshotted incrementally, the operators start with the state of +the latest full snapshot and then apply a series of incremental snapshot +updates to that state. + +See [Restart Strategies]({{ site.baseurl }}{% link dev/task_failure_recovery.zh.md +%}#restart-strategies) for more information. + +### State Backends + +`TODO: expand this section` + +The exact data structures in which the key/values indexes are stored depends on +the chosen [state backend]({{ site.baseurl }}{% link +ops/state/state_backends.zh.md %}). One state backend stores data in an in-memory +hash map, another state backend uses [RocksDB](http://rocksdb.org) as the +key/value store. In addition to defining the data structure that holds the +state, the state backends also implement the logic to take a point-in-time +snapshot of the key/value state and store that snapshot as part of a +checkpoint. State backends can be configured without changing your application +logic. + +checkpoints and snapshots + +{% top %} + +### Savepoints + +`TODO: expand this section` + +All programs that use checkpointing can resume execution from a **savepoint**. +Savepoints allow both updating your programs and your Flink cluster without +losing any state. + +[Savepoints]({{ site.baseurl }}{% link ops/state/savepoints.zh.md %}) are +**manually triggered checkpoints**, which take a snapshot of the program and +write it out to a state backend. They rely on the regular checkpointing +mechanism for this. + +Savepoints are similar to checkpoints except that they are +**triggered by the user** and **don't automatically expire** when newer +checkpoints are completed. + +{% top %} + +### Exactly Once vs. At Least Once + +The alignment step may add latency to the streaming program. Usually, this +extra latency is on the order of a few milliseconds, but we have seen cases +where the latency of some outliers increased noticeably. For applications that +require consistently super low latencies (few milliseconds) for all records, +Flink has a switch to skip the stream alignment during a checkpoint. Checkpoint +snapshots are still drawn as soon as an operator has seen the checkpoint +barrier from each input. + +When the alignment is skipped, an operator keeps processing all inputs, even +after some checkpoint barriers for checkpoint *n* arrived. That way, the +operator also processes elements that belong to checkpoint *n+1* before the +state snapshot for checkpoint *n* was taken. On a restore, these records will +occur as duplicates, because they are both included in the state snapshot of +checkpoint *n*, and will be replayed as part of the data after checkpoint *n*. + +{% info Note %} Alignment happens only for operators with multiple predecessors +(joins) as well as operators with multiple senders (after a stream +repartitioning/shuffle). Because of that, dataflows with only embarrassingly +parallel streaming operations (`map()`, `flatMap()`, `filter()`, ...) actually +give *exactly once* guarantees even in *at least once* mode. + +{% top %} + +## End-to-end Exactly-Once Programs + +`TODO: add` + +## State and Fault Tolerance in Batch Programs + +Flink executes [batch programs](../dev/batch/index.html) as a special case of +streaming programs, where the streams are bounded (finite number of elements). +A *DataSet* is treated internally as a stream of data. The concepts above thus +apply to batch programs in the same way as well as they apply to streaming +programs, with minor exceptions: + + - [Fault tolerance for batch programs](../dev/batch/fault_tolerance.html) + does not use checkpointing. Recovery happens by fully replaying the + streams. That is possible, because inputs are bounded. This pushes the + cost more towards the recovery, but makes the regular processing cheaper, + because it avoids checkpoints. + + - Stateful operations in the DataSet API use simplified in-memory/out-of-core + data structures, rather than key/value indexes. + + - The DataSet API introduces special synchronized (superstep-based) + iterations, which are only possible on bounded streams. For details, check + out the [iteration docs]({{ site.baseurl }}/dev/batch/iterations.html). + +{% top %} diff --git a/docs/concepts/timely-stream-processing.zh.md b/docs/concepts/timely-stream-processing.zh.md new file mode 100644 index 0000000000000..5d6f197dd36c6 --- /dev/null +++ b/docs/concepts/timely-stream-processing.zh.md @@ -0,0 +1,211 @@ +--- +title: Timely Stream Processing +nav-id: timely-stream-processing +nav-pos: 3 +nav-title: Timely Stream Processing +nav-parent_id: concepts +--- + + +`TODO: add introduction` + +* This will be replaced by the TOC +{:toc} + +## Latency & Completeness + +`TODO: add these two sections` + +### Latency vs. Completeness in Batch & Stream Processing + +{% top %} + +## Notions of Time: Event Time and Processing Time + +When referring to time in a streaming program (for example to define windows), +one can refer to different notions of *time*: + +- **Processing time:** Processing time refers to the system time of the machine + that is executing the respective operation. + + When a streaming program runs on processing time, all time-based operations + (like time windows) will use the system clock of the machines that run the + respective operator. An hourly processing time window will include all + records that arrived at a specific operator between the times when the system + clock indicated the full hour. For example, if an application begins running + at 9:15am, the first hourly processing time window will include events + processed between 9:15am and 10:00am, the next window will include events + processed between 10:00am and 11:00am, and so on. + + Processing time is the simplest notion of time and requires no coordination + between streams and machines. It provides the best performance and the + lowest latency. However, in distributed and asynchronous environments + processing time does not provide determinism, because it is susceptible to + the speed at which records arrive in the system (for example from the message + queue), to the speed at which the records flow between operators inside the + system, and to outages (scheduled, or otherwise). + +- **Event time:** Event time is the time that each individual event occurred on + its producing device. This time is typically embedded within the records + before they enter Flink, and that *event timestamp* can be extracted from + each record. In event time, the progress of time depends on the data, not on + any wall clocks. Event time programs must specify how to generate *Event Time + Watermarks*, which is the mechanism that signals progress in event time. This + watermarking mechanism is described in a later section, + [below](#event-time-and-watermarks). + + In a perfect world, event time processing would yield completely consistent + and deterministic results, regardless of when events arrive, or their + ordering. However, unless the events are known to arrive in-order (by + timestamp), event time processing incurs some latency while waiting for + out-of-order events. As it is only possible to wait for a finite period of + time, this places a limit on how deterministic event time applications can + be. + + Assuming all of the data has arrived, event time operations will behave as + expected, and produce correct and consistent results even when working with + out-of-order or late events, or when reprocessing historic data. For example, + an hourly event time window will contain all records that carry an event + timestamp that falls into that hour, regardless of the order in which they + arrive, or when they are processed. (See the section on [late + events](#late-elements) for more information.) + + Note that sometimes when event time programs are processing live data in + real-time, they will use some *processing time* operations in order to + guarantee that they are progressing in a timely fashion. + +Event Time and Processing Time + +{% top %} + +## Event Time and Watermarks + +*Note: Flink implements many techniques from the Dataflow Model. For a good +introduction to event time and watermarks, have a look at the articles below.* + + - [Streaming + 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by + Tyler Akidau + - The [Dataflow Model + paper](https://research.google.com/pubs/archive/43864.pdf) + + +A stream processor that supports *event time* needs a way to measure the +progress of event time. For example, a window operator that builds hourly +windows needs to be notified when event time has passed beyond the end of an +hour, so that the operator can close the window in progress. + +*Event time* can progress independently of *processing time* (measured by wall +clocks). For example, in one program the current *event time* of an operator +may trail slightly behind the *processing time* (accounting for a delay in +receiving the events), while both proceed at the same speed. On the other +hand, another streaming program might progress through weeks of event time with +only a few seconds of processing, by fast-forwarding through some historic data +already buffered in a Kafka topic (or another message queue). + +------ + +The mechanism in Flink to measure progress in event time is **watermarks**. +Watermarks flow as part of the data stream and carry a timestamp *t*. A +*Watermark(t)* declares that event time has reached time *t* in that stream, +meaning that there should be no more elements from the stream with a timestamp +*t' <= t* (i.e. events with timestamps older or equal to the watermark). + +The figure below shows a stream of events with (logical) timestamps, and +watermarks flowing inline. In this example the events are in order (with +respect to their timestamps), meaning that the watermarks are simply periodic +markers in the stream. + +A data stream with events (in order) and watermarks + +Watermarks are crucial for *out-of-order* streams, as illustrated below, where +the events are not ordered by their timestamps. In general a watermark is a +declaration that by that point in the stream, all events up to a certain +timestamp should have arrived. Once a watermark reaches an operator, the +operator can advance its internal *event time clock* to the value of the +watermark. + +A data stream with events (out of order) and watermarks + +Note that event time is inherited by a freshly created stream element (or +elements) from either the event that produced them or from watermark that +triggered creation of those elements. + +### Watermarks in Parallel Streams + +Watermarks are generated at, or directly after, source functions. Each parallel +subtask of a source function usually generates its watermarks independently. +These watermarks define the event time at that particular parallel source. + +As the watermarks flow through the streaming program, they advance the event +time at the operators where they arrive. Whenever an operator advances its +event time, it generates a new watermark downstream for its successor +operators. + +Some operators consume multiple input streams; a union, for example, or +operators following a *keyBy(...)* or *partition(...)* function. Such an +operator's current event time is the minimum of its input streams' event times. +As its input streams update their event times, so does the operator. + +The figure below shows an example of events and watermarks flowing through +parallel streams, and operators tracking event time. + +Parallel data streams and operators with events and watermarks + +## Lateness + +It is possible that certain elements will violate the watermark condition, +meaning that even after the *Watermark(t)* has occurred, more elements with +timestamp *t' <= t* will occur. In fact, in many real world setups, certain +elements can be arbitrarily delayed, making it impossible to specify a time by +which all elements of a certain event timestamp will have occurred. +Furthermore, even if the lateness can be bounded, delaying the watermarks by +too much is often not desirable, because it causes too much delay in the +evaluation of event time windows. + +For this reason, streaming programs may explicitly expect some *late* elements. +Late elements are elements that arrive after the system's event time clock (as +signaled by the watermarks) has already passed the time of the late element's +timestamp. See [Allowed Lateness]({{ site.baseurl }}{% link +dev/stream/operators/windows.zh.md %}#allowed-lateness) for more information on +how to work with late elements in event time windows. + +## Windowing + +Aggregating events (e.g., counts, sums) works differently on streams than in +batch processing. For example, it is impossible to count all elements in a +stream, because streams are in general infinite (unbounded). Instead, +aggregates on streams (counts, sums, etc), are scoped by **windows**, such as +*"count over the last 5 minutes"*, or *"sum of the last 100 elements"*. + +Windows can be *time driven* (example: every 30 seconds) or *data driven* +(example: every 100 elements). One typically distinguishes different types of +windows, such as *tumbling windows* (no overlap), *sliding windows* (with +overlap), and *session windows* (punctuated by a gap of inactivity). + +Time- and Count Windows + +Please check out this [blog +post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html) for +additional examples of windows or take a look a [window documentation]({{ +site.baseurl }}{% link dev/stream/operators/windows.zh.md %}) of the DataStream +API. + +{% top %}