Skip to content

Commit

Permalink
[FLINK-7635] Add side-output test in WindowOperatorContractTest
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 25, 2017
1 parent c151a53 commit 1ebd44a
Showing 1 changed file with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,60 @@ public void testLateSideOutput() throws Exception {

}

/**
* This also verifies that the timestamps ouf side-emitted records is correct.
*/
@Test
public void testSideOutput() throws Exception {

final OutputTag<Integer> integerOutputTag = new OutputTag<Integer>("int-out") {};
final OutputTag<Long> longOutputTag = new OutputTag<Long>("long-out") {};

WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();

InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> windowFunction =
new InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow>() {
@Override
public void process(
Integer integer,
TimeWindow window,
InternalWindowContext ctx,
Iterable<Integer> input,
Collector<Void> out) throws Exception {
Integer inputValue = input.iterator().next();

ctx.output(integerOutputTag, inputValue);
ctx.output(longOutputTag, inputValue.longValue());
}

@Override
public void clear(
TimeWindow window,
InternalWindowContext context) throws Exception {}
};

OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, windowFunction);

testHarness.open();

final long windowEnd = 42L;

when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Collections.singletonList(new TimeWindow(0, windowEnd)));

shouldFireOnElement(mockTrigger);

testHarness.processElement(new StreamRecord<>(17, 5L));

assertThat(testHarness.getSideOutput(integerOutputTag),
contains(isStreamRecord(17, windowEnd - 1)));

assertThat(testHarness.getSideOutput(longOutputTag),
contains(isStreamRecord(17L, windowEnd - 1)));
}

@Test
public void testAssignerIsInvokedOncePerElement() throws Exception {

Expand Down

0 comments on commit 1ebd44a

Please sign in to comment.