Clojure background job queue on top of PostgreSQL 9.5.
It allows creating background jobs, placing those jobs on multiple queues, and processing them later. Background jobs can be any named Clojure function.
The project is mostly inspired by Que, Resque and Celery.
- Durability: queue can survive app restarts because it is stored inside a PostgreSQL table. All done and failed jobs are left in a table so that at any time user is able to inspect, retry or purge them manually.
- Embedment: queue consumption worker can be easily started in a background thread.
- Parallelism: queue can be consumed by several threads on different machines to better utilize multiple CPU cores. The parallel queue consumption is based on a new FOR UPDATE/SKIP LOCKED feature from PostgreSQL 9.5.
- Transactional guarantees:
- Every job is executed inside its own database transaction.
- If a job is marked done than all its database statements have been committed.
- In case of exception inside a job all job's database statements are rolled back and a job is marked failed. Thus if a job is marked new or failed then none of its database statements have been committed yet.
- Scheduling can be executed in a transaction where a data needed for a job is committed. So that a worker will not pick up the job before the database commits.
- Multiple queues: jobs can be scheduled to different queues/tags. E.g. you can schedule heavy jobs into a separate "slow" queue/worker in order to not block an execution of more important jobs from a "light" queue.
- Fewer dependencies: if you already use PostgreSQL, a separate queue (Redis, RabbitMQ, etc.) is another moving part that can break.
- Small: the implementation with docstrings is less than 300 LOC.
It hasn't been proven yet, but Byplay can experience the problem described in Que docs:
Que's job table undergoes a lot of churn when it is under high load, and like any heavily-written table, is susceptible to bloat and slowness if Postgres isn't able to clean it up. The most common cause of this is long-running transactions, so it's recommended to try to keep all transactions against the database housing Que's job table as short as possible. This is good advice to remember for any high-activity database, but bears emphasizing when using tables that undergo a lot of writes.
This PostgreSQL issue is explained in more detail in the article "Postgres Job Queues & Failure By MVCC".
Alpha version. The library hasn't been used in production yet. But it has a nice suite of tests.
Add dependency to your project:
[byplay "0.4.0"]
Require a namespace:
(ns my-app.core
(:require
[byplay.core :as b]
,,,))
On your app start setup Byplay table and the accompanying indexes in the database (it's safe to call this function more than once):
(b/migrate jdbc-conn)
Here jdbc-conn
is a "raw" JDBC connection.
There are different ways to obtain such instance:
- Via funcool/clojure.jdbc JDBC wrapper:
(with-open [conn (jdbc.core/connection dbspec)]
(let [jdbc-conn (jdbc.proto/connection conn)]
,,,))
- Via clojure/java.jdbc JDBC wrapper:
(clojure.java.jdbc/with-db-connection [conn db-spec]
(let [jdbc-conn (clojure.java.jdbc/db-connection conn)]
,,,))
- Via JDBC datasource (e.g. HikariCP):
(with-open [jdbc-conn (.getConnection datasource)]
,,,)
Define a job function:
(defn my-job
[ctx x y z]
(do-something-in-job-transaction1 (:jdbc-conn ctx))
; or if you use funcool/clojure.jdbc JDBC wrapper:
(do-something-in-job-transaction2 (:conn ctx))
,,,)
Here (:jdbc-conn ctx)
is a JDBC connection with the current transaction in progress and
(:conn ctx)
is the same connection wrapped by funcool/clojure.jdbc
connection instance.
Put the job into :default
queue:
(b/schedule jdbc-conn #'my-job 1 2 3)
Explicitly specify another queue using schedule-to
:
(b/schedule-to jdbc-conn :my-queue #'my-job 1 2 3)
Or specify the queue in the job metadata at :byplay.core/queue
key:
(defn ^{::b/queue :my-queue} my-job
[ctx x y z]
,,,)
(b/schedule jdbc-conn #'my-job 1 2 3)
Define an instance of funcool/clojure.jdbc database specification, e.g.:
(def dbspec {:classname "org.postgresql.Driver"
:subprotocol "postgresql"
:subname "//localhost:5432/myapp"})
Start a background worker with 2 concurrent work threads, each polling the specified queue for a new job every 5 seconds:
(b/start (b/new-worker dbspec {:threads-num 2
:queues [:my-queue]
:polling-interval 5000
:on-fail (fn on-fail
[worker exc {:keys [id job args queue state] :as _job}]
,,,)}))
on-fail
function will be called if exception is thrown from the job.
You can ask a worker to finish all currently running jobs and stop polling a database with interrupt
method.
For example this is how a worker can be gracefully stopped in
the application shutdown hook:
(.addShutdownHook (Runtime/getRuntime)
(Thread. #(do
; stop the worker before other services (to not break jobs in progress)
(doto worker b/interrupt b/join)
; stop other services
,,,)))
Because in rare cases a job may be started more than once. E.g. a worker may die in the middle of a job execution leaving this job in new state.
Thanks to transactional guarantees, if job only updates the database then you don't have to worry about this problem. Just don't forget to use a connection from the job context.
See funcool/clojure.jdbc docs. Otherwise Byplay will create a new connection to the database on every poll.
If you schedule a job and than rename its namespace/function than worker won't find the job var and will fail the task. Also be careful with changing job args.
It is possible that an exception can occur in the worker thread outside of a job function. By default such exceptions silently kill a background thread. So it's a good practice to be ready to explicitly detect them with Thread/setDefaultUncaughtExceptionHandler.
More information can be found at the project site:
Copyright © 2016 Yuri Govorushchenko.
Released under an MIT license.