Skip to content

Commit

Permalink
[FLINK-10986][tests] Implement DB to setup Apache Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
GJL committed Dec 10, 2018
1 parent 1438051 commit c6d0446
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
3 changes: 3 additions & 0 deletions flink-jepsen/src/jepsen/flink/flink.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
[checker :as flink-checker]
[db :as fdb]
[hadoop :as hadoop]
[kafka :as kafka]
[mesos :as mesos]
[nemesis :as fn]]))

(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz")
(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
(def kafka-dist-url "http:https://mirror.funkfreundelandshut.de/apache/kafka/2.0.1/kafka_2.11-2.0.1.tgz")
(def deb-zookeeper-package "3.4.9-3+deb8u1")
(def deb-mesos-package "1.5.0-2.0.2")
(def deb-marathon-package "1.6.322")
Expand All @@ -43,6 +45,7 @@
:flink-standalone-session (fdb/start-flink-db)
:flink-mesos-session (fdb/flink-mesos-app-master)
:hadoop (hadoop/db hadoop-dist-url)
:kafka (kafka/db kafka-dist-url)
:mesos (mesos/db deb-mesos-package deb-marathon-package)
:zookeeper (zk/db deb-zookeeper-package)})

Expand Down
99 changes: 99 additions & 0 deletions flink-jepsen/src/jepsen/flink/kafka.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http:https://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns jepsen.flink.kafka
(:require [clojure.tools.logging :refer :all]
[jepsen
[db :as db]
[control :as c]
[util :refer [meh]]]
[jepsen.control.util :as cu]
[jepsen.flink.zookeeper :as fzk]
[jepsen.flink.utils :as fu]))

(def install-dir "/opt/kafka")
(def application-log-dir "/opt/kafka/logs")
(def log-dirs "/opt/kafka/kafka-logs")
(def server-properties (str install-dir "/config/server.properties"))
(def start-script (str install-dir "/bin/kafka-server-start.sh"))
(def topic-script (str install-dir "/bin/kafka-topics.sh"))
(def stop-script (str install-dir "/bin/kafka-server-stop.sh"))

(defn- broker-id
[nodes node]
(.indexOf (sort nodes) node))

(defn- override-property
[name value]
(str "--override " name "=" value))

(defn- start-server-command
[{:keys [nodes] :as test} node]
(fu/join-space
start-script
"-daemon"
server-properties
(override-property "zookeeper.connect" (fzk/zookeeper-quorum test))
(override-property "broker.id" (broker-id nodes node))
(override-property "log.dirs" log-dirs)
(override-property "retention.ms" "1800000")))

(defn- start-server!
[test node]
(c/exec (c/lit (start-server-command test node))))

(defn- stop-server!
[]
(info "Stopping Kafka")
(cu/grepkill! "kafka"))

(defn- create-topic-command
[{:keys [nodes] :as test}]
(fu/join-space
topic-script
"--create"
"--topic kafka-test-topic"
(str "--partitions " (count nodes))
"--replication-factor 1"
"--zookeeper"
(fzk/zookeeper-quorum test)))

(defn- create-topic!
[test]
(info "Attempting to create Kafka topic")
(fu/retry (fn [] (c/exec (c/lit (create-topic-command test))))))

(defn- delete-kafka!
[]
(info "Deleting Kafka distribution and logs")
(c/exec :rm :-rf install-dir))

(defn db
[kafka-dist-url]
(reify db/DB
(setup! [_ test node]
(c/su
(cu/install-archive! kafka-dist-url install-dir)
(start-server! test node)
(when (zero? (broker-id (:nodes test) node))
(create-topic! test))))
(teardown! [_ _ _]
(c/su
(stop-server!)
(delete-kafka!)))
db/LogFiles
(log-files [_ _ _]
(fu/find-files! application-log-dir))))

0 comments on commit c6d0446

Please sign in to comment.