Skip to content

radiospiel/postqueue

 
 

Repository files navigation

Postqueue

Intro

The postqueue gem implements a simple to use queue on top of postgresql. Note that while a queue like this is typically used in a job queueing scenario, this document does not talk about jobs, it talks about queue items; it also does not schedule a job, it enqueues an item, and it does not executes a job, it processes queue items.

Why building an additional queue implementation? Compared to delayed_job or the other usual suspects postqueue implements these features:

  • The item structure is intentionally kept super simple: an item is described by an op field - a string - and an id field, an integer. In a typical usecase a queue item would describe an operation on a specific entity, where op names both the operation and the entity type and the id field would describe the individual entity.

  • With such a simplistic item structure the queue itself can be searched or otherwise evaluated using SQL. This also allows for skipping duplicate entries when enqueuing items (managed via a duplicate: argument when enqueuing) and for batch processing multple items in one go.

  • With data being kept in a Postgresql database processing provides transactional semantics: an item failing to process stays in the queue. Error handling is kept simpe to a strategy of rescheduling items up to a specific maximum number of processing attemps.

Please be aware that postqueue is using the SELECT .. FOR UPDATE SKIP LOCKED Postgresql syntax, and therefore needs at least PostgresQL >= 9.5.

Basic usage

queue = Postqueue.new
queue.enqueue op: "product/reindex", entity_id: [12,13,14,15]
queue.on "product/reindex" do |op, entity_ids|
  Product.index_many(Product.where(id: entity_ids))
end

queue.process

The process call will select a number of queue items for processing. They will all have the same op attribute. The callback will receive the op attribute and the entity_ids of all queue entries selected for processing. The processing method will return the number of processed items.

If no callback is given the matching items are only removed from the queue without any processing.

Postqueue.process also accepts the following arguments:

  • op: only process entries with this op value;
  • batch_size: maximum number of items to process in one go.

Example:

Postqueue.process(op: 'product/reindex', batch_size: 10) do |op, entity_ids|
  # only handle up to 10 "product/reindex" entries
end

If the block raises an exception the queue will postpone processing these entries by an increasing amount of time, up until queue.max_attempts failed attempts. That value defaults to 5.

If the queue is empty or no matching queue entry could be found, Postqueue.process returns 0.

Advanced usage

Concurrency

Postqueue implements the following concurrency guarantees:

  • catastrophic DB failure and communication breakdown aside a queue item which is enqueued will eventually be processed successfully exactly once;
  • multiple consumers can work in parallel.

Note that you should not share a Postqueue ruby object across threads - instead you should create process objects with the identical configuration.

Idempotent operations

When enqueueing items duplicate idempotent operations are not enqueued. Whether or not an operation should be considered idempotent is defined when configuring the queue:

 Postqueue.new do |queue|
   queue.on "idempotent", idempotent: true do ]op, entity_ids|
     # .. handle queue item
   end
 end

Processing a single entry

Postqueue implements a shortcut to process only a single entry. Under the hood this calls Postqueue.process with batch_size set to 1:

queue.process_one

Note that even though process_one will only ever process a single entry the entity_ids parameter to the callback is still an array (with a single ID entry in that case).

Migrating

Postqueue comes with migration helpers:

# set up a table for use with postqueue.
Postqueue.migrate!(table_name = "postqueue")

# set up a table for use with postqueue.
Postqueue.unmigrate!(table_name = "postqueue")

You can also set up your own table, as long as it is compatible.

To use a non-default table or a non-default database, change the item_class attribute of the queue:

Postqueue.new do |queue|
  queue.item_class = MyItemClass
end

MyItemClass should inherit from Postqueue::Item and use the same or a compatible database structure.

Batch processing

Often queue items can be batched together for a performant operation. To allow batch processing for some items, configure the Postqueue to either set a default_batch_size or an operation-specific batch_size:

Postqueue.new do |queue|
  queue.default_batch_size = 100
  queue.on "batchable", batch_size: 10 do
    ...
  end
end

Test mode

Postqueue works usually in an async mode: queue items that are enqueued are kept in a queue, and must be picked up later explicitely for processing (via one of the process, process_one or process_until_empty methods).

During unit tests it is likely preferrable to process queue items in synchronous fashion - if you are interested in actual processing - or, at least, in a mode which validates that the op value is valid (that means that a handler is registered for that op). You can change the processing mode via

# can be :sync, :async, :verify
Postqueue.processing = :sync

You can also change the processing mnode on a queue-by-queue base:

Postqueue.new do |queue|
  queue.processing = :sync
end

Installation

Add this line to your application's Gemfile:

gem 'postqueue'

And then execute:

$ bundle

Or install it yourself as:

$ gem install postqueue

Development

After checking out the repo, run bin/setup to install dependencies. Make sure you have a local postgresql implementation of at least version 9.5. Add a postqueue user with a postqueue password, and create a postqueue_test database for it. The script ./scripts/prepare_pg can be somewhat helpful in establishing that.

Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install.

To release a new version, run ./scripts/release, which will bump the version number, create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/postqueue.

License

The gem is available as open source under the terms of the MIT License.

About

A simple postgres-based queue

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Ruby 99.1%
  • Shell 0.9%