Skip to content

Commit

Permalink
[FLINK-3995] [build] flink-test-utils also contains the streaming tes…
Browse files Browse the repository at this point in the history
…t utilities.

Test utilities include the StreamingMultipleProgramsTestBase and StreamingTestEnvironment.

This moves the ITCases for streaming into 'flink-tests' to achieve that.

This closes apache#2092
  • Loading branch information
StephanEwen authored and rmetzger committed Jul 5, 2016
1 parent 4b71e0e commit b9f42e9
Show file tree
Hide file tree
Showing 46 changed files with 169 additions and 970 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import org.junit.Assert;
import org.junit.Test;

Expand Down
14 changes: 1 addition & 13 deletions flink-streaming-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -95,18 +95,6 @@ under the License.

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- disable fork reuse for the streaming project, because of
incorrect declaration of tests -->
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
Expand All @@ -57,15 +58,14 @@
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;

import org.junit.Test;

import static org.junit.Assert.*;

@SuppressWarnings("serial")
public class DataStreamTest extends StreamingMultipleProgramsTestBase {
public class DataStreamTest {

/**
* Tests union functionality. This ensures that self-unions and unions of streams
Expand Down Expand Up @@ -452,7 +452,7 @@ public Long fold(Long accumulator, Long value) throws Exception {
}
});

windowed.addSink(new NoOpSink<Long>());
windowed.addSink(new DiscardingSink<Long>());

DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -486,7 +486,7 @@ public void invoke(Long value) throws Exception {
}

DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
parallelSource.addSink(new NoOpSink<Long>());
parallelSource.addSink(new DiscardingSink<Long>());
assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());

parallelSource.setParallelism(3);
Expand Down Expand Up @@ -557,7 +557,7 @@ public Integer map(Long value) throws Exception {
}
};
DataStream<Integer> map = src.map(mapFunction);
map.addSink(new NoOpSink<Integer>());
map.addSink(new DiscardingSink<Integer>());
assertEquals(mapFunction, getFunctionForDataStream(map));


Expand All @@ -569,7 +569,7 @@ public void flatMap(Long value, Collector<Integer> out) throws Exception {
}
};
DataStream<Integer> flatMap = src.flatMap(flatMapFunction);
flatMap.addSink(new NoOpSink<Integer>());
flatMap.addSink(new DiscardingSink<Integer>());
assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));

FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() {
Expand All @@ -582,7 +582,7 @@ public boolean filter(Integer value) throws Exception {
DataStream<Integer> unionFilter = map.union(flatMap)
.filter(filterFunction);

unionFilter.addSink(new NoOpSink<Integer>());
unionFilter.addSink(new DiscardingSink<Integer>());

assertEquals(filterFunction, getFunctionForDataStream(unionFilter));

Expand All @@ -606,7 +606,7 @@ public Iterable<String> select(Integer value) {
};

SplitStream<Integer> split = unionFilter.split(outputSelector);
split.select("dummy").addSink(new NoOpSink<Integer>());
split.select("dummy").addSink(new DiscardingSink<Integer>());
List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
assertEquals(1, outputSelectors.size());
assertEquals(outputSelector, outputSelectors.get(0));
Expand All @@ -632,7 +632,7 @@ public String map2(Integer value) {
}
};
DataStream<String> coMap = connect.map(coMapper);
coMap.addSink(new NoOpSink<String>());
coMap.addSink(new DiscardingSink<String>());
assertEquals(coMapper, getFunctionForDataStream(coMap));

try {
Expand Down Expand Up @@ -772,7 +772,7 @@ public Object map2(Tuple2<Long, Long> value) {
return null;
}
});
coMap.addSink(new NoOpSink());
coMap.addSink(new DiscardingSink());
return coMap.getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.SplittableIterator;

import org.junit.Test;

public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
public class StreamExecutionEnvironmentTest {

@Test
public void fromElementsWithBaseTypeTest1() {
Expand Down Expand Up @@ -73,18 +73,18 @@ public void testFromCollectionParallelism() {
// expected
}

dataStream1.addSink(new NoOpSink<Integer>());
dataStream1.addSink(new DiscardingSink<Integer>());

DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
typeInfo).setParallelism(4);

dataStream2.addSink(new NoOpSink<Integer>());
dataStream2.addSink(new DiscardingSink<Integer>());

String plan = env.getExecutionPlan();
env.getExecutionPlan();

assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
assertEquals("Parallelism of parallel collection source must be 4.",
4,
4,
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
}
catch (Exception e) {
Expand All @@ -109,7 +109,7 @@ public void cancel() {
}
};
DataStreamSource<Integer> src1 = env.addSource(srcFun);
src1.addSink(new NoOpSink<Integer>());
src1.addSink(new DiscardingSink<Integer>());
assertEquals(srcFun, getFunctionFromDataSource(src1));

List<Long> list = Arrays.asList(0L, 1L, 2L);
Expand All @@ -135,8 +135,9 @@ private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStr
return streamGraph.getStreamNode(dataStream.getId()).getOperator();
}

@SuppressWarnings("unchecked")
private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
dataStreamSource.addSink(new NoOpSink<T>());
dataStreamSource.addSink(new DiscardingSink<T>());
AbstractUdfStreamOperator<?, ?> operator =
(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
return (SourceFunction<T>) operator.getUserFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;

import org.junit.Test;

@SuppressWarnings("serial")
public class TypeFillTest extends StreamingMultipleProgramsTestBase {
public class TypeFillTest {

@Test
public void test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;

import org.junit.Test;

public class OutputSelectorTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
Expand All @@ -39,7 +40,6 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.NoOpSink;

import org.junit.Test;

Expand All @@ -49,8 +49,7 @@
/**
* Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select,
* union, partitioning since the other translation routines are tested already in operation
* specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
* iterations.
* specific tests.
*/
public class StreamGraphGeneratorTest {

Expand All @@ -77,7 +76,7 @@ public void testVirtualTransformations() throws Exception {
.broadcast()
.map(new NoOpIntMap());

broadcastMap.addSink(new NoOpSink<Integer>());
broadcastMap.addSink(new DiscardingSink<Integer>());

// verify that partitioning is preserved across union and split/select
EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
Expand Down Expand Up @@ -113,7 +112,7 @@ public void testVirtualTransformations() throws Exception {
SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
.map(new NoOpIntMap());

unionedMap.addSink(new NoOpSink<Integer>());
unionedMap.addSink(new DiscardingSink<Integer>());

StreamGraph graph = env.getStreamGraph();

Expand Down Expand Up @@ -169,7 +168,7 @@ public void testVirtualTransformations2() throws Exception {
.select("foo")
.map(new NoOpIntMap());

unionedMap.addSink(new NoOpSink<Integer>());
unionedMap.addSink(new DiscardingSink<Integer>());

StreamGraph graph = env.getStreamGraph();

Expand Down Expand Up @@ -207,9 +206,9 @@ public void testOutputTypeConfigurationWithOneInputTransformation() throws Excep
BasicTypeInfo.INT_TYPE_INFO,
outputTypeConfigurableOperation);

result.addSink(new NoOpSink<Integer>());
result.addSink(new DiscardingSink<Integer>());

StreamGraph graph = env.getStreamGraph();
env.getStreamGraph();

assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
}
Expand All @@ -230,9 +229,9 @@ public void testOutputTypeConfigurationWithTwoInputTransformation() throws Excep
BasicTypeInfo.INT_TYPE_INFO,
outputTypeConfigurableOperation);

result.addSink(new NoOpSink<Integer>());
result.addSink(new DiscardingSink<Integer>());

StreamGraph graph = env.getStreamGraph();
env.getStreamGraph();

assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@

package org.apache.flink.streaming.api.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.HashSet;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.streaming.api.datastream.StreamProjection;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestHarnessUtil;

import org.junit.Test;
Expand All @@ -52,7 +42,7 @@
* <li>Watermarks are correctly forwarded</li>
* </ul>
*/
public class StreamProjectTest extends StreamingMultipleProgramsTestBase {
public class StreamProjectTest {

@Test
public void testProject() throws Exception {
Expand Down Expand Up @@ -91,47 +81,4 @@ public void testProject() throws Exception {

TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}


// tests using projection from the API without explicitly specifying the types
private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();

@Test
public void APIWithoutTypesTest() {

for (Long i = 1L; i < 11L; i++) {
expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple3<Long, Character, Double> map(Long value) throws Exception {
return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
}
})
.project(0, 2)
.addSink(new SinkFunction<Tuple>() {
private static final long serialVersionUID = 1L;

@Override
@SuppressWarnings("unchecked")
public void invoke(Tuple value) throws Exception {
actual.add( (Tuple2<Long,Double>) value);
}
});

try {
env.execute();
} catch (Exception e) {
fail(e.getMessage());
}

assertEquals(expected, actual);
}
}
Loading

0 comments on commit b9f42e9

Please sign in to comment.