diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh index b2fd195b4da7d..04e82ac29e9e6 100755 --- a/flink-jepsen/docker/run-tests.sh +++ b/flink-jepsen/docker/run-tests.sh @@ -28,7 +28,7 @@ n2 n3 EOF -common_jepsen_args+=(--ha-storage-dir hdfs:///flink +common_jepsen_args+=( --tarball ${2} --ssh-private-key ~/.ssh/id_rsa --nodes-file ${dockerdir}/nodes) diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj index 6bac982622a26..c636fcc1ff30d 100644 --- a/flink-jepsen/src/jepsen/flink/db.clj +++ b/flink-jepsen/src/jepsen/flink/db.clj @@ -36,13 +36,13 @@ (def taskmanager-slots 3) -(defn flink-configuration +(defn- default-flink-configuration [test node] {:high-availability "zookeeper" :high-availability.zookeeper.quorum (zookeeper-quorum test) - :high-availability.storageDir (str (:ha-storage-dir test) "/ha") + :high-availability.storageDir "hdfs:///flink/ha" :jobmanager.rpc.address node - :state.savepoints.dir (str (:ha-storage-dir test) "/savepoints") + :state.savepoints.dir "hdfs:///flink/savepoints" :rest.address node :rest.port 8081 :rest.bind-address "0.0.0.0" @@ -52,6 +52,12 @@ :state.backend.local-recovery "true" :taskmanager.registration.timeout "30 s"}) +(defn flink-configuration + [test node] + (let [additional-config (-> test :test-spec :flink-config)] + (merge (default-flink-configuration test node) + additional-config))) + (defn write-configuration! "Writes the flink-conf.yaml to the flink conf directory" [test node] diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj index 6ff73e43cc541..6bd45c08ede86 100644 --- a/flink-jepsen/src/jepsen/flink/flink.clj +++ b/flink-jepsen/src/jepsen/flink/flink.clj @@ -120,7 +120,6 @@ :parse-fn read-test-spec :validate [#(->> % :dbs (map dbs) (every? (complement nil?))) (str "Invalid :dbs specification. " (keys->allowed-values-help-text dbs))]] - [nil "--ha-storage-dir DIR" "high-availability.storageDir"] [nil "--nemesis-gen GEN" (str "Which nemesis should be used?" (keys->allowed-values-help-text fn/nemesis-generator-factories)) :parse-fn keyword diff --git a/flink-jepsen/test/jepsen/flink/db_test.clj b/flink-jepsen/test/jepsen/flink/db_test.clj new file mode 100644 index 0000000000000..236577b8a07d1 --- /dev/null +++ b/flink-jepsen/test/jepsen/flink/db_test.clj @@ -0,0 +1,29 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns jepsen.flink.db-test + (:require [clojure.test :refer :all]) + (:require [jepsen.flink.db :refer :all])) + +(deftest flink-configuration-test + (testing "High availability is zookeeper by default" + (is (= "zookeeper" (:high-availability (flink-configuration {} "n1"))))) + + (testing "Default configuration can be overridden" + (let [expected-config-value "NONE" + custom-flink-config {:high-availability expected-config-value} + test {:test-spec {:flink-config custom-flink-config}}] + (is (= expected-config-value (:high-availability (flink-configuration test "n1")))))))