Skip to content

Commit

Permalink
Merge pull request apache#10622: [BEAM=6857] Fix timermap test to not…
Browse files Browse the repository at this point in the history
… use TestStream
  • Loading branch information
reuvenlax committed Jan 18, 2020
1 parent b5a75ae commit ebe8bdc
Showing 1 changed file with 6 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4297,12 +4297,7 @@ public void onTimer(OutputReceiver<String> r, @TimerId("timer") Timer timer) {
public static class TimerFamilyTests extends SharedTestBase implements Serializable {

@Test
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesTestStream.class,
UsesTimerMap.class
})
@Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyEventTime() throws Exception {
final String timerFamilyId = "foo";

Expand Down Expand Up @@ -4332,24 +4327,14 @@ public void onTimer(
}
};

TestStream<KV<String, Integer>> stream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.advanceWatermarkTo(new Instant(0))
.addElements(KV.of("hello", 37))
.advanceWatermarkToInfinity();

PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
PCollection<String> output =
pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2");
pipeline.run();
}

@Test
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesTestStream.class,
UsesTimerMap.class
})
@Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerWithMultipleTimerFamily() throws Exception {
final String timerFamilyId1 = "foo";
final String timerFamilyId2 = "bar";
Expand Down Expand Up @@ -4386,13 +4371,8 @@ public void onTimer2(
}
};

TestStream<KV<String, Integer>> stream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.advanceWatermarkTo(new Instant(0))
.addElements(KV.of("hello", 37))
.advanceWatermarkToInfinity();

PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn));
PCollection<String> output =
pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer", "timer");
pipeline.run();
}
Expand Down

0 comments on commit ebe8bdc

Please sign in to comment.