Skip to content

Commit

Permalink
[FLINK-2419] Add test for sinks after keyBy and groupBy
Browse files Browse the repository at this point in the history
Closes apache#947
  • Loading branch information
gyfora committed Jul 29, 2015
1 parent b211a62 commit 73af891
Showing 1 changed file with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,48 @@ public String map2(Integer value) {
fail(e.getMessage());
}
}

@Test
public void sinkKeyTest() {
StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
StreamGraph streamGraph = env.getStreamGraph();

DataStream<Long> sink = env.generateSequence(1, 100).print();
assertTrue(streamGraph.getStreamNode(sink.getId()).getStatePartitioner() == null);
assertTrue(streamGraph.getStreamNode(sink.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);

KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {

private static final long serialVersionUID = 1L;

@Override
public Long getKey(Long value) throws Exception {
return (long) 0;
}
};

DataStream<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();

assertTrue(streamGraph.getStreamNode(sink2.getId()).getStatePartitioner() != null);
assertEquals(key1, streamGraph.getStreamNode(sink2.getId()).getStatePartitioner());
assertTrue(streamGraph.getStreamNode(sink2.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);

KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {

private static final long serialVersionUID = 1L;

@Override
public Long getKey(Long value) throws Exception {
return (long) 0;
}
};

DataStream<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();

assertTrue(streamGraph.getStreamNode(sink3.getId()).getStatePartitioner() != null);
assertEquals(key2, streamGraph.getStreamNode(sink3.getId()).getStatePartitioner());
assertTrue(streamGraph.getStreamNode(sink3.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);
}

@Test
public void testChannelSelectors() {
Expand Down

0 comments on commit 73af891

Please sign in to comment.