Skip to content

Commit

Permalink
[hotfix] Move code from nemesis to generator
Browse files Browse the repository at this point in the history
  • Loading branch information
GJL committed Dec 10, 2018
1 parent 1dd68ec commit e6d98a3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 57 deletions.
5 changes: 3 additions & 2 deletions flink-jepsen/src/jepsen/flink/flink.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
[client :refer :all]
[checker :as flink-checker]
[db :as fdb]
[generator :as fg]
[hadoop :as hadoop]
[kafka :as kafka]
[mesos :as mesos]
Expand Down Expand Up @@ -87,9 +88,9 @@
job-running-healthy-threshold
job-recovery-grace-period)
:generator (let [stop (atom nil)]
(->> (fn/stoppable-generator stop (client-gen))
(->> (fg/stoppable-generator stop (client-gen))
(gen/nemesis
(fn/stop-generator stop
(fg/stop-generator stop
((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
job-running-healthy-threshold
job-recovery-grace-period))))
Expand Down
61 changes: 60 additions & 1 deletion flink-jepsen/src/jepsen/flink/generator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

(ns jepsen.flink.generator
(:require [jepsen.util :as util]
[jepsen.generator :as gen]))
[jepsen.generator :as gen]
[jepsen.flink.checker :as flink-checker]))

(gen/defgenerator TimeLimitGen
[dt source deadline-atom]
Expand All @@ -37,3 +38,61 @@
(defn time-limit
[dt source]
(TimeLimitGen. dt source (atom nil)))

(defn stoppable-generator
"Given an atom and a source generator, returns a generator that stops emitting operations from
the source if the atom is set to true."
[stop source]
(reify gen/Generator
(op [_ test process]
(if @stop
nil
(gen/op source test process)))))

(defn- take-last-with-default
[n default coll]
(->>
(cycle [default])
(concat (reverse coll))
(take n)
(reverse)))

(defn- inc-by-factor
[n factor]
(assert (>= factor 1))
(int (* n factor)))

(defn stop-generator
"Returns a generator that emits operations from a given source generator. If the source is
exhausted and either job-recovery-grace-period has passed or the job has been running
job-running-healthy-threshold times consecutively, the stop atom is set to true."
[stop source job-running-healthy-threshold job-recovery-grace-period]
(gen/concat source
(let [t (atom nil)]
(reify gen/Generator
(op [_ test process]
(when (nil? @t)
(compare-and-set! t nil (util/relative-time-nanos)))
(let [history (->>
(:active-histories test)
deref
first
deref)
job-running-history (->>
history
(filter (fn [op] (>= (- (:time op) @t) 0)))
(flink-checker/all-jobs-running?-history)
(take-last-with-default job-running-healthy-threshold false))]
(if (or
(every? true? job-running-history)
(> (util/relative-time-nanos) (+ @t
(util/secs->nanos
(inc-by-factor
job-recovery-grace-period
1.1)))))
(do
(reset! stop true)
nil)
(do
(Thread/sleep 1000)
(recur test process)))))))))
54 changes: 0 additions & 54 deletions flink-jepsen/src/jepsen/flink/nemesis.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
[util :as ju]]
[jepsen.control.util :as cu]
[jepsen.flink.client :refer :all]
[jepsen.flink.checker :as flink-checker]
[jepsen.flink.generator :as fgen]
[jepsen.flink.hadoop :as fh]
[jepsen.flink.zookeeper :refer :all]))
Expand Down Expand Up @@ -70,59 +69,6 @@

;;; Generators

(defn stoppable-generator
[stop source]
(reify gen/Generator
(op [gen test process]
(if @stop
nil
(gen/op source test process)))))

(defn take-last-with-default
[n default coll]
(->>
(cycle [default])
(concat (reverse coll))
(take n)
(reverse)))

(defn- inc-by-factor
[n factor]
(assert (>= factor 1))
(int (* n factor)))

(defn stop-generator
[stop source job-running-healthy-threshold job-recovery-grace-period]
(gen/concat source
(let [t (atom nil)]
(reify gen/Generator
(op [_ test process]
(when (nil? @t)
(compare-and-set! t nil (ju/relative-time-nanos)))
(let [history (->>
(:active-histories test)
deref
first
deref)
job-running-history (->>
history
(filter (fn [op] (>= (- (:time op) @t) 0)))
(flink-checker/all-jobs-running?-history)
(take-last-with-default job-running-healthy-threshold false))]
(if (or
(every? true? job-running-history)
(> (ju/relative-time-nanos) (+ @t
(ju/secs->nanos
(inc-by-factor
job-recovery-grace-period
1.1)))))
(do
(reset! stop true)
nil)
(do
(Thread/sleep 1000)
(recur test process)))))))))

(defn kill-taskmanagers-gen
[time-limit dt op]
(fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
Expand Down

0 comments on commit e6d98a3

Please sign in to comment.