Skip to content

Commit

Permalink
[hotfix] [docs] Fix ProcessWindowFunction code snippets.
Browse files Browse the repository at this point in the history
This closes apache#6527.
  • Loading branch information
fhueske committed Aug 10, 2018
1 parent a442eb6 commit 396bdd1
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions docs/dev/stream/operators/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -724,17 +724,19 @@ A `ProcessWindowFunction` can be defined and used like this:
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple<String, Long> in: input) {
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
Expand All @@ -749,9 +751,9 @@ public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String,
val input: DataStream[(String, Long)] = ...

input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())

/* ... */

Expand Down

0 comments on commit 396bdd1

Please sign in to comment.