Skip to content

Commit

Permalink
[FLINK-12508] [docs] extend section on unit testing Flink operators
Browse files Browse the repository at this point in the history
  • Loading branch information
knaufk authored and rmetzger committed May 17, 2019
1 parent 2937b60 commit 5f3a769
Showing 1 changed file with 256 additions and 20 deletions.
276 changes: 256 additions & 20 deletions docs/dev/stream/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,78 +23,314 @@ specific language governing permissions and limitations
under the License.
-->

This page briefly discusses how to test a Flink application in your IDE or a local environment.
Testing is an integral part of every software development process as such Apache Flink comes with tooling to test your application code on multiple levels of the testing pyramid.

* This will be replaced by the TOC
{:toc}

## Unit testing
## Testing User-Defined Functions

Usually, one can assume that Flink produces correct results outside of a user-defined `Function`. Therefore, it is recommended to test `Function` classes that contain the main business logic with unit tests as much as possible.
Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.

For example if one implements the following `ReduceFunction`:
### Unit Testing Stateless, Timeless UDFs


For example, let's take the following stateless `MapFunction`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class SumReduce implements ReduceFunction<Long> {
public class IncrementMapFunction implements MapFunction<Long, Long> {

@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
public Long map(Long record) throws Exception {
return record +1 ;
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class SumReduce extends ReduceFunction[Long] {
class IncrementMapFunction extends MapFunction[Long, Long] {

override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
value1 + value2
override def map(record: Long): Long = {
record + 1
}
}
{% endhighlight %}
</div>
</div>

It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:
It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class SumReduceTest {
public class IncrementMapFunctionTest {

@Test
public void testSum() throws Exception {
public void testIncrement() throws Exception {
// instantiate your function
SumReduce sumReduce = new SumReduce();
IncrementMapFunction incrementer = new IncrementMapFunction();

// call the methods that you have implemented
assertEquals(42L, sumReduce.reduce(40L, 2L));
assertEquals(3L, incrementer.map(2L));
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class SumReduceTest extends FlatSpec with Matchers {
class IncrementMapFunctionTest extends FlatSpec with Matchers {

"SumReduce" should "add values" in {
"IncrementMapFunction" should "increment values" in {
// instantiate your function
val sumReduce: SumReduce = new SumReduce()
val incrementer: IncrementMapFunction = new IncrementMapFunction()

// call the methods that you have implemented
sumReduce.reduce(40L, 2L) should be (42L)
incremeter.map(2) should be (3)
}
}
{% endhighlight %}
</div>
</div>

Similarly, a user-defined function which uses an `org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or `ProcessFunction`) can be easily tested by providing a mock object instead of a real collector. A `FlatMapFunction` with the same functionality as the `IncrementMapFunction` could be unit tested as follows.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class IncrementFlatMapFunctionTest {

@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();

Collector<Integer> collector = mock(Collector.class);

// call the methods that you have implemented
incrementer.flatMap(2L, collector)

//verify collector was called with the right output
Mockito.verify(collector, times(1)).collect(3L);
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {

"IncrementFlatMapFunction" should "increment values" in {
// instantiate your function
val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()

val collector = mock[Collector[Integer]]

//verify collector was called with the right output
(collector.collect _).expects(3)

// call the methods that you have implemented
flattenFunction.flatMap(2, collector)
}
}
{% endhighlight %}
</div>
</div>

### Unit Testing Stateful or Timely UDFs & Custom Operators

Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink's runtime.
For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:

* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of two `DataStream`s)
* `KeyedTwoInputStreamOperatorTestHarness` (for operators on `ConnectedStreams` of two `KeyedStream`s)

To use the test harnesses a set of additional dependencies (test scoped) is needed.

{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
{% endhighlight %}

Now, the test harnesses can be used to push records and watermarks into your user-defined functions or custom operators, control processing time and finally assert on the output of the operator (including side outputs).

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;

@Before
public void setupTestHarness() throws Exception {

//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();

// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));

// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}

@Test
public void testingStatefulFlatMapFunction() throws Exception {

//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);

//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);

//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);

//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))

//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {

private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
private var statefulFlatMap: StatefulFlatMapFunction = null

before {
//instantiate user-defined function
statefulFlatMap = new StatefulFlatMap

// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))

// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}

"StatefulFlatMap" should "do some fancy stuff with timers and state" in {


//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2, 100);

//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100);

//trigger proccesign time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100);

//retrieve list of emitted records for assertions
testHarness.getOutput should contain (3)

//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
}
}
{% endhighlight %}
</div>
</div>

`KeyedOneInputStreamOperatorTestHarness` and `KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally providing a `KeySelector` including `TypeInformation` for the class of the key.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

public class StatefulFlatMapFunctionTest {
private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;

@Before
public void setupTestHarness() throws Exception {

//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();

// wrap user defined function into a the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);

// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}

//tests

}

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {

private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
private var statefulFlatMapFunction: FlattenFunction = null

before {
//instantiate user-defined function
statefulFlatMapFunction = new StateFulFlatMap

// wrap user defined function into a the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())

// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}

//tests

}
{% endhighlight %}
</div>
</div>

Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:

* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` is a good example for testing operators and user-defined functions, which depend on processing or event time.
* `org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest` shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`. Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and `AbstractStreamOperatorTestHarness.initializeState` to tests its interaction with Flink's checkpointing mechanism.

## Integration testing
<span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.

In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.

Expand Down

0 comments on commit 5f3a769

Please sign in to comment.