Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State timers documentation #10854

Merged
merged 10 commits into from
Feb 27, 2020
Merged

Conversation

reuvenlax
Copy link
Contributor

No description provided.

@reuvenlax
Copy link
Contributor Author

R: @dpmills
R: @rezarokni

Beam provides several types of state:

#### ValueState
A DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This paragraph shouldn't be under ValueState; it should be in the State and Timers section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

A DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each
state must be named using the `StateId` annotation; this name is unique to a ParDo in the graph and has no relation
to other nodes in the graph. A `DoFn` can declare multiple state variables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need an intro for what ValueState is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

```

### 10.3 Timers {#timers}
Beam provides a per-key timer callback API. This allows for delayed processing of data aggregated using the state API.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/aggregated/stored/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@ProcessElement public void process(@TimerId("timer") Timer timer) {
...
// Set a timer to go off 30 seconds in the future.
timer.offset(Duration.standardSeconds(30).setRelative();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a paren. Would read better as
timer.setRelative().offset(Duration.standardSeconds(30))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately today the timer is set in the setRelative() method, so the subsequent offset() does not do anything!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also setRelative() is a void method :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed paren

For processing-time timers, the default output timestamp and watermark hold is the value of the input watermark at the
time the timer was set.

In some cases, a pipeline needs to output timestamps earlier than the timer expiration time, and therefore also needs to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/pipeline/DoFn/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
````

#### 10.5.2 Batching RPCs {#batching-rpcs}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth including a link to the GroupIntoBatches transform here

@ProcessElement public void process(
@Element KV<String, ValueT> element,
@StateId("elementBag") BagState<ValueT> elementBag,
@StateId("timerSet") ValueState<Boolean> timerSet,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timerSet is a confusing name, use isTimerSet or timerIsSet instead, here and below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


#### 10.3.3 Dynamic timer tags {#dynamic-timer-tags}
Beam also supports dynamically setting a timer tag using `TimerMap`. This allows for setting multiple different timers
in a `ParDo` and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be consistent between using DoFn vs ParDo; I think DoFn is probably better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

A common use case for state is to accumulate multiple elements. `BagState` allows for accumulating elements in an
unordered manner. This allows for addition of elements to the collection without requiring the reading of the entire
collection first, which is an efficiency gain. In addition, runners that support paged reads can allow individual
bag collections larger than available memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bag collections seems redundant; maybe just "bags"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}));
```
### 10.2 Deferred state reads {#deferred-state-reads}
When a `DoFn` contains multiple state specifications, reading each one in order can be slow. Calling the read() function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put code in backticks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@rezarokni rezarokni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments.


## 10. State and Timers {#state-and-timers}
Beam's windowing and triggering facilities provide a powerful abstraction for grouping and aggregating unbounded input
data based on timestamps. However there are aggregation use cases for which Beam's windows and triggers are not the best
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However there are aggregation use cases for which developers may require a higher degree of control...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. done

the `ParDo` these state variables can be used to write or update state for the current key or to read previous state
written for that key. State is always fully scoped only to the current processing key.

Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also important to point out that you can use different window strategies before the State call. Something which I have seen folks not realise until pointed out.
For example Window+Combiner as a reducer before data comes to the state DoFn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

to other nodes in the graph. A `DoFn` can declare multiple state variables.

If the type of the ValueState has a coder registered, then Beam will automatically infer the coder for the state value.
For example, the following ParDo creates a single state variable that accumulates the number of elements seen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth explicitly calling out the lack of order? Although it doesnt matter for this use case, ValueState can be misused for other things when not aware of no order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


```
#### 10.3.2 Processing-time timers {#processing-time-timers}
Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth mentioning expected behaviour if used in batch pipeline. If I recall direct runner doesnt fire process time timers?

@reuvenlax reuvenlax removed the runners label Feb 25, 2020
@reuvenlax reuvenlax merged commit 94e6978 into apache:master Feb 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants