Skip to content

Commit

Permalink
[FLINK-10986][tests] Add example on how to run a Jepsen test with Kafka
Browse files Browse the repository at this point in the history
This closes apache#7173.
  • Loading branch information
GJL committed Dec 10, 2018
1 parent c6d0446 commit be80fcc
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
32 changes: 21 additions & 11 deletions flink-jepsen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,37 @@ The faults that can be currently introduced to the Flink cluster include:
* Network partitions

There are many more properties other than job availability that could be
verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
verified but are not yet fully covered by this project, e.g., end-to-end exactly-once processing
semantics.

## Usage

### Setting up the Environment
See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
for how to set up the environment to run tests. The script under `docker/run-tests.sh` documents how to invoke
tests. The Flink job used for testing is located under
`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
root.
for details on how to set up the environment required to run the tests.
To simplify development, we have prepared Dockerfiles and a [Docker Compose](https://docs.docker.com/compose/) template
so that you can run the tests locally in containers (see Section [Docker](#usage-docker)).

### Running Tests
This project does not comprise of only a single test that can be run but rather a parameterizable
test template. This allows the user to specify the cluster manager that Flink should be on, the
location of the high availability storage directory, the jobs to be submitted, etc.
The script under `docker/run-tests.sh` shows examples on how to specify and run tests.
By default, the example tests run the `DataStreamAllroundTestProgram`, which is located under
`flink-end-to-end-tests/flink-datastream-allround-test` of the Flink project root.
Before running the tests, you have to build the job first, and copy the resulting jar
(`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's root.
Also included in the examples is a more complicated scenario with two jobs that share a Kafka
topic. See the `run-tests.sh` script for details on how to enable and run this test.

### Docker

To simplify development, we have prepared Dockerfiles and a Docker Compose template
so that you can run the tests locally in containers. To build the images
and start the containers, simply run:
To build the images and start the containers, simply run:

$ cd docker
$ ./up.sh

After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
This should start one control node container and three containers that will be used as DB nodes.
After the containers have started, open a new terminal window and run `docker exec -it jepsen-control bash`.
This will allow you to run arbitrary commands on the control node.
To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
Expand Down
27 changes: 24 additions & 3 deletions flink-jepsen/docker/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,39 @@ common_jepsen_args+=(--ha-storage-dir hdfs:https:///flink
for i in $(seq 1 ${1})
do
echo "Executing run #${i} of ${1}"

# YARN session cluster
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/yarn-session.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/yarn-session.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --test-spec "${dockerdir}/test-specs/yarn-session.edn"

# YARN per-job cluster
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/yarn-job.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/yarn-job.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --test-spec "${dockerdir}/test-specs/yarn-job.edn"

# Mesos
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/mesos-session.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/mesos-session.edn"

lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/standalone-session.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-jobs --test-spec "${dockerdir}/standalone-session.edn"
echo
# Standalone
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/standalone-session.edn"
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-jobs --test-spec "${dockerdir}/test-specs/standalone-session.edn"

# Below is a test that uses Flink's exactly-once Kafka producer/consumer.
# The test submits two jobs:
#
# (1) DataGeneratorJob - Publishes data to a Kafka topic
# (2) StateMachineJob - Consumes data from the same Kafka topic, and validates exactly-once semantics
#
# To enable the test, you first need to build the flink-state-machine-kafka job jar,
# and copy the artifact to flink-jepsen/bin:
#
# git clone https://github.com/igalshilman/flink-state-machine-example
# cd flink-state-machine-example
# mvn clean package -pl flink-state-machine-kafka/flink-state-machine-kafka -am
# cp flink-state-machine-kafka/flink-state-machine-kafka/target/flink-state-machine-kafka-1.0-SNAPSHOT.jar /path/to/flink-jepsen/bin
#
# lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers-bursts --time-limit 60 --test-spec "${dockerdir}/test-specs/standalone-session-kafka.edn" --job-running-healthy-threshold 15

done
24 changes: 24 additions & 0 deletions flink-jepsen/docker/test-specs/standalone-session-kafka.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http:https://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

{:dbs [:hadoop :zookeeper :kafka :flink-standalone-session]
:jobs [{:job-jar "/jepsen/bin/flink-state-machine-kafka-1.0-SNAPSHOT.jar"
:job-args "--parallelism 1 --checkpointInterval 5000 --numKeys 1000 --topic kafka-test-topic --sleep 200 --semantic exactly-once --bootstrap.servers localhost:9092 --transaction.timeout.ms 600000 --checkpointDir hdfs:https:///flink-checkpoints"
:main-class "com.dataartisans.flink.example.eventpattern.DataGeneratorJob"}

{:job-jar "/jepsen/bin/flink-state-machine-kafka-1.0-SNAPSHOT.jar"
:job-args "--parallelism 1 --checkpointInterval 5000 --input-topic kafka-test-topic --bootstrap.servers localhost:9092 --checkpointDir hdfs:https:///flink-checkpoints --auto.offset.reset earliest"
:main-class "com.dataartisans.flink.example.eventpattern.StateMachineJob"}]}

0 comments on commit be80fcc

Please sign in to comment.