-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
State timers documentation #10854
State timers documentation #10854
Conversation
R: @dpmills |
Beam provides several types of state: | ||
|
||
#### ValueState | ||
A DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This paragraph shouldn't be under ValueState; it should be in the State and Timers section
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
A DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each | ||
state must be named using the `StateId` annotation; this name is unique to a ParDo in the graph and has no relation | ||
to other nodes in the graph. A `DoFn` can declare multiple state variables. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need an intro for what ValueState is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
``` | ||
|
||
### 10.3 Timers {#timers} | ||
Beam provides a per-key timer callback API. This allows for delayed processing of data aggregated using the state API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/aggregated/stored/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@ProcessElement public void process(@TimerId("timer") Timer timer) { | ||
... | ||
// Set a timer to go off 30 seconds in the future. | ||
timer.offset(Duration.standardSeconds(30).setRelative(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a paren. Would read better as
timer.setRelative().offset(Duration.standardSeconds(30))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately today the timer is set in the setRelative() method, so the subsequent offset() does not do anything!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also setRelative() is a void method :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed paren
For processing-time timers, the default output timestamp and watermark hold is the value of the input watermark at the | ||
time the timer was set. | ||
|
||
In some cases, a pipeline needs to output timestamps earlier than the timer expiration time, and therefore also needs to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/pipeline/DoFn/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
```` | ||
|
||
#### 10.5.2 Batching RPCs {#batching-rpcs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth including a link to the GroupIntoBatches transform here
@ProcessElement public void process( | ||
@Element KV<String, ValueT> element, | ||
@StateId("elementBag") BagState<ValueT> elementBag, | ||
@StateId("timerSet") ValueState<Boolean> timerSet, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timerSet is a confusing name, use isTimerSet or timerIsSet instead, here and below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
#### 10.3.3 Dynamic timer tags {#dynamic-timer-tags} | ||
Beam also supports dynamically setting a timer tag using `TimerMap`. This allows for setting multiple different timers | ||
in a `ParDo` and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be consistent between using DoFn vs ParDo; I think DoFn is probably better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
A common use case for state is to accumulate multiple elements. `BagState` allows for accumulating elements in an | ||
unordered manner. This allows for addition of elements to the collection without requiring the reading of the entire | ||
collection first, which is an efficiency gain. In addition, runners that support paged reads can allow individual | ||
bag collections larger than available memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bag collections seems redundant; maybe just "bags"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
})); | ||
``` | ||
### 10.2 Deferred state reads {#deferred-state-reads} | ||
When a `DoFn` contains multiple state specifications, reading each one in order can be slow. Calling the read() function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put code in backticks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments.
|
||
## 10. State and Timers {#state-and-timers} | ||
Beam's windowing and triggering facilities provide a powerful abstraction for grouping and aggregating unbounded input | ||
data based on timestamps. However there are aggregation use cases for which Beam's windows and triggers are not the best |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However there are aggregation use cases for which developers may require a higher degree of control...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. done
the `ParDo` these state variables can be used to write or update state for the current key or to read previous state | ||
written for that key. State is always fully scoped only to the current processing key. | ||
|
||
Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also important to point out that you can use different window strategies before the State call. Something which I have seen folks not realise until pointed out.
For example Window+Combiner as a reducer before data comes to the state DoFn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
to other nodes in the graph. A `DoFn` can declare multiple state variables. | ||
|
||
If the type of the ValueState has a coder registered, then Beam will automatically infer the coder for the state value. | ||
For example, the following ParDo creates a single state variable that accumulates the number of elements seen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth explicitly calling out the lack of order? Although it doesnt matter for this use case, ValueState can be misused for other things when not aware of no order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
``` | ||
#### 10.3.2 Processing-time timers {#processing-time-timers} | ||
Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth mentioning expected behaviour if used in batch pipeline. If I recall direct runner doesnt fire process time timers?
6052fb0
to
b4824b1
Compare
No description provided.