Skip to content

Commit

Permalink
[FLINK-12508] [docs] add documentation about MiniClusterWithClientRes…
Browse files Browse the repository at this point in the history
…ource
  • Loading branch information
knaufk authored and rmetzger committed May 17, 2019
1 parent 5f3a769 commit 5330196
Showing 1 changed file with 72 additions and 68 deletions.
140 changes: 72 additions & 68 deletions docs/dev/stream/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,14 @@ Many more examples for the usage of these test harnesses can be found in the Fli

<span class="label label-info">Note</span> Be aware that `AbstractStreamOperatorTestHarness` and its derived classes are currently not part of the public API and can be subject to change.

In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.
## Testing Flink Jobs

In order to do so add the test dependency `flink-test-utils`:
### JUnit Rule `MiniClusterWithClientResource`

Apache Flink provides a JUnit rule called `MiniClusterWithClientResource` for testing complete jobs against a local, embedded mini cluster.
called `MiniClusterWithClientResource`.

To use `MiniClusterWithClientResource` one additional dependency (test scoped) is needed.

{% highlight xml %}
<dependency>
Expand All @@ -344,53 +349,61 @@ In order to do so add the test dependency `flink-test-utils`:
</dependency>
{% endhighlight %}

For example, if you want to test the following `MapFunction`:
Let us take the same simple `MapFunction` as in the previous sections.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class MultiplyByTwo implements MapFunction<Long, Long> {
public class IncrementMapFunction implements MapFunction<Long, Long> {

@Override
public Long map(Long value) throws Exception {
return value * 2;
public Long map(Long record) throws Exception {
return record +1 ;
}
}
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
class MultiplyByTwo extends MapFunction[Long, Long] {
class IncrementMapFunction extends MapFunction[Long, Long] {

override def map(value: Long): Long = {
value * 2
override def map(record: Long): Long = {
record + 1
}
}
{% endhighlight %}
</div>
</div>

You could write the following integration test:
A simple pipeline using this `MapFunction` can now be tested in a local Flink cluster as follows.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class ExampleIntegrationTest extends AbstractTestBase {
public class ExampleIntegrationTest {

@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());

@Test
public void testMultiply() throws Exception {
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);
env.setParallelism(2);

// values are collected in a static variable
CollectSink.values.clear();

// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new MultiplyByTwo())
.map(new IncrementMapFunction())
.addSink(new CollectSink());

// execute
Expand All @@ -417,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase {

<div data-lang="scala" markdown="1">
{% highlight scala %}
class ExampleIntegrationTest extends AbstractTestBase {
class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {

@Test
def testMultiply(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build)

// configure your test environment
env.setParallelism(1)
before {
flinkCluster.before()
}

// values are collected in a static variable
CollectSink.values.clear()
after {
flinkCluster.after()
}

// create a stream of custom elements and apply transformations
env
.fromElements(1L, 21L, 22L)
.map(new MultiplyByTwo())
.addSink(new CollectSink())

// execute
env.execute()
"IncrementFlatMapFunction pipeline" should "incrementValues" in {

// verify your results
assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment

// configure your test environment
env.setParallelism(2)

// values are collected in a static variable
CollectSink.values.clear()

// create a stream of custom elements and apply transformations
env.fromElements(1, 21, 22)
.map(new IncrementMapFunction())
.addSink(new CollectSink())

// execute
env.execute()

// verify your results
CollectSink.values should contain allOf (1,22,23)
}
}
// create a testing sink
class CollectSink extends SinkFunction[Long] {

override def invoke(value: java.lang.Long): Unit = {
synchronized {
values.add(value)
}
override def invoke(value: Long): Unit = {
synchronized {
CollectSink.values.add(value)
}
}
}

object CollectSink {

// must be static
val values: List[Long] = new ArrayList()
val values: util.List[Long] = new util.ArrayList()
}
{% endhighlight %}
</div>
</div>

The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
Alternatively, you could for example write the data to files in a temporary directory with your test sink.
You can also implement your own custom sources for emitting watermarks.

## Testing checkpointing and state handling

One way to test state handling is to enable checkpointing in integration tests.
A few remarks on integration testing with `MiniClusterWithClientResource`:

You can do that by configuring your `StreamExecutionEnvironment` in the test:
* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
{% endhighlight %}
</div>
</div>
* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster.
Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
Alternatively, you could write the data to files in a temporary directory with your test sink.

And for example adding to your Flink application an identity mapper operator that will throw an exception
once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions.
* You can implement a custom *parallel* source function for emitting watermarks if your job uses event timer timers.

Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module.
* It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.

For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module.
* Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests.

Be aware that `AbstractStreamOperatorTestHarness` is currently not a part of public API and can be subject to change.
* If your pipeline contains custom state handling, you can test its correctness by enabling checkpointing and restarting the job within the mini cluster. For this, you need to trigger a failure by throwing an exception from (a test-only) user-defined function in your pipeline.

{% top %}

0 comments on commit 5330196

Please sign in to comment.