Skip to content

Commit

Permalink
Update ParDoTest.java
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Jan 8, 2020
1 parent 83d90fd commit 7d60044
Showing 1 changed file with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3729,48 +3729,78 @@ public void testTwoTimersSettingEachOtherWithCreateAsInput() {
pipeline.run();
}

@Test
@Test
@Category({
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesTimersInParDo.class,
UsesTestStream.class
})
public void testValueStateSimple() {
public void testOutputTimestamp() {
final String timerId = "bar";
DoFn<KV<String, Integer>, Integer> fn =
new DoFn<KV<String, Integer>, Integer>() {
DoFn<KV<String, Integer>, KV<String, Integer>> fn1 =
new DoFn<KV<String, Integer>, KV<String, Integer>>() {

@TimerId(timerId)
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(ProcessContext c, @TimerId(timerId) Timer timer) {
timer.withOutputTimestamp(new Instant(5)).set(new Instant(8));
public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<KV<String, Integer>> o) {
timer.withOutputTimestamp(new Instant(5)).set(new Instant(10));
// Output a message. This will cause the next DoFn to set a timer as well.
o.output(KV.of("foo", 100));
}

@OnTimer(timerId)
public void onTimer(OnTimerContext c, BoundedWindow w) {
c.output(100);
}
};

DoFn<KV<String, Integer>, Integer> fn2 =
new DoFn<KV<String, Integer>, Integer>() {

@TimerId(timerId)
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@StateId("timerFired")
final StateSpec<ValueState<Boolean>> timerFiredState = StateSpecs.value();

@ProcessElement
public void processElement(@TimerId(timerId) Timer timer,
@StateId("timerFired") ValueState<Boolean> timerFiredState) {
Boolean timerFired = timerFiredState.read();
assertTrue(timerFired == null || timerFired == false);
// Set a timer to 8. This is earlier than the previous DoFn's timer, but after the previous
// DoFn timer's watermark hold. This timer should not fire until the previous timer fires and removes
// the watermark hold.
timer.set(new Instant(8));
}

@OnTimer(timerId)
public void onTimer(@StateId("timerFired") ValueState<Boolean> timerFiredState, OutputReceiver<Integer> o) {
timerFiredState.write(true);
o.output(100);
}
};

TestStream<KV<String, Integer>> stream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.advanceWatermarkTo(new Instant(0))
// Cause fn2 to set a timer.
.addElements(KV.of("key", 1))
.advanceWatermarkToInfinity();
// Normally this would case fn2's timer to expire, but it shouldn't here because of the output timestamp.
.advanceWatermarkTo(new Instant(9))
// If the timer fired, then this would case fn2 to fail with an assertion error.
.addElements(KV.of("key", 1))
.advanceWatermarkToInfinity();
PCollection<Integer> output =
pipeline
.apply(stream)
.apply(
Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(7))) // window
.discardingFiredPanes())
.apply("first", ParDo.of(fn));
.apply("first", ParDo.of(fn1))
.apply("second", ParDo.of(fn2));

Instant base = new Instant(0);
PAssert.that(output)
.inWindow(new IntervalWindow(base, base.plus(Duration.millis(7)))) // interval window
.containsInAnyOrder(100); // result output
pipeline.run();
}
Expand Down

0 comments on commit 7d60044

Please sign in to comment.