Robust job processing in Elixir, backed by modern PostgreSQL.
Reliable,
observable and loaded with enterprise grade features.
Oban's primary goals are reliability, consistency and observability. It is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection. You can leave your application running indefinitely without worrying about jobs being lost or orphaned due to crashes.
Advantages over in-memory, mnesia, Redis and RabbitMQ based tools:
- Fewer Dependencies β If you are running a web app there is a very good chance that you're running on top of a RDBMS. Running your job queue within PostgreSQL minimizes system dependencies and
- Transactional Control β Enqueue a job along with other database changes, ensuring that everything is committed or rolled back atomically.
- Database Backups β Jobs are stored inside of your primary database, which means they are backed up together with the data that they relate to.
Advanced features and advantages over other RDBMS based tools:
- Isolated Queues β Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a jobs in a single slow queue can't back up other faster queues.
- Queue Control β Queues can be paused, resumed and scaled independently at runtime.
- Job Killing β Jobs can be killed in the middle of execution regardless of
which node they are running on. This stops the job at once and flags it as
discarded
. - Triggered execution β Database triggers ensure that jobs are dispatched as soon as they are inserted into the database.
- Scheduled Jobs β Jobs can be scheduled at any time in the future, down to the second.
- Job Safety β When a process crashes or the BEAM is terminated executing jobs aren't lostβthey are quickly recovered by other running nodes or immediately when the node is restarted.
- Historic Metrics β After a job is processed the row is not deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.
- Node Metrics β Every queue broadcasts metrics during runtime. These are used to monitor queue health across nodes.
- Queue Draining β Queue shutdown is delayed so that slow jobs can finish executing before shutdown.
- Telemetry Integration β Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.
Oban is published on Hex. Add it to your list of
dependencies in mix.exs
:
def deps do
[
{:oban, "~> 0.2"}
]
end
Then run mix deps.get
to install Oban and its dependencies, including
Ecto, Jason and Postgrex.
After the packages are installed you must create a database migration to
add the oban_jobs
table to your database:
mix ecto.gen.migration add_oban_jobs_table
Open the generated migration in your editor and delegate the up
and down
functions to Oban.Migrations
:
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
defdelegate up, to: Oban.Migrations
defdelegate down, to: Oban.Migrations
end
Finally, run the migration to create the table:
mix ecto.migrate
Next see Usage for how to integrate Oban into your application and start defining jobs!
Oban isn't an application and won't be started automatically. It is started by a
supervisor that must be included in your application's supervision tree. All of
your configuration is passed into the Oban
supervisor, allowing you to
configure Oban like the rest of your application.
# confg/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10, events: 50, media: 20]
# lib/my_app/application.ex
defmodule MyApp.Application do
@moduledoc false
use Application
alias MyApp.{Endpoint, Repo}
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, Application.get_env(:my_app, Oban)}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
end
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
queues: [default: 10, mailers: 20, events: 50, media: 5]
There isn't a limit to the number of queues or how many jobs may execute concurrently. Here are a few caveats and guidelines:
- Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
- Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
- Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.
Worker modules do the work of processing a job. At a minimum they must define a
perform/1
function, which is called with an args
map.
Define a worker to process jobs in the events
queue:
defmodule MyApp.Workers.Business do
use Oban.Worker, queue: "events", max_attempts: 10
@impl Oban.Worker
def perform(%{"id" => id}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
IO.inspect(model)
end
end
The return value of perform/1
doesn't matter and is entirely ignored. If the
job raises an exception or throws an exit then the error will be reported and
the job will be retried (provided there are attempts remaining).
Jobs are simply Ecto strucs and are enqueued by inserting them into the
database. Here we insert a job into the default
queue and specify the worker
by module name:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: MyApp.Worker)
|> MyApp.Repo.insert()
For convenience and consistency all workers implement a new/2
function that
converts an args map into a job changeset suitable for inserting into the
database:
%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> MyApp.Repo.insert()
The worker's defaults may be overridden by passing options:
%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> MyApp.Repo.insert()
Jobs may be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Workers.Business.new(schedule_in: 5)
|> MyApp.Repo.insert()
Oban doesn't provide any special mechanisms for testing. However, here are a few recommendations for running tests in isolation.
-
Set a high
poll_interval
in your test configuration. This effectively stops queues from polling and will prevent inserted jobs from executing.config :my_app, Oban, poll_interval: :timer.minutes(30)
-
Be sure to use the Ecto Sandbox for testing. Oban makes use of database pubsub events to dispatch jobs, but pubsub events never fire within a transaction. Since sandbox tests run within a transaction no events will fire and jobs won't be dispatched.
config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox
To run the Oban test suite you must have PostgreSQL 10+ running locally with a
database named oban_test
. Follow these steps to create the database, create
the database and run all migrations:
# Create the database
MIX_ENV=test mix ecto.create -r Oban.Test.Repo
# Run the base migration
MIX_ENV=test mix ecto.migrate -r Oban.Test.Repo
To ensure a commit passes CI you should run mix ci
locally, which executes the
following commands:
- Check formatting (
mix format --check-formatted
) - Lint with Credo (
mix credo --strict
) - Run all tests (
mix test
) - Run Dialyzer (
mix dialyzer --halt-exit-status
)