Skip to content

kpassapk/resque-bus

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Resque Bus

This gem uses Redis and Resque to allow simple asynchronous communication between apps.

Example

Application A can publish an event

# config
ResqueBus.redis = "192.168.1.1:6379"

# business logic
ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

# or do it later
ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

Application B is subscribed to events

# config
ResqueBus.redis = "192.168.1.1:6379"

# initializer
ResqueBus.dispatch("app_b") do
  # processes event on app_b_default queue
  # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created"
  subscribe "user_created" do |attributes|
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end
  
  # processes event on app_b_critical queue
  # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid"
  critical "user_paid" do |attributes|
    CreditCard.charge!(attributes)
  end

  # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|`
  
  # and regexes work as well. note that with the above configuration along with this regex,
  # the following as well as the corresponding block above would both be executed
  subscribe /^user_/ do |attributes|
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end
  
  # the above all filter on just the event_type, but you can filter on anything
  # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type
  subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
    Mixpanel.homepage_action!(attributes["action"])
  end
end

Applications can also subscribe within classes using the provided Subscriber module.

class SimpleSubscriber
  include ResqueBus::Subscriber
  subscribe :my_method

  def my_method(attributes)
    # heavy lifting
  end
end

The following is equivalent to the original initializer and shows more options:

class OtherSubscriber
  include ResqueBus::Subscriber
  application :app_b

  subscribe :user_created
  subscribe_queue :app_b_critical, :user_paid
  subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/
  subscribe :homepage_method, :user_id => :present, :page => "homepage"

  def user_created(attributes)
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end

  def user_paid(attributes)
    CreditCard.charge!(attributes)
  end

  def user_action(attributes)
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end

  def homepage_method
    Mixpanel.homepage_action!(attributes["action"])
  end
end

Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly.

Commands

Each app needs to tell Redis about its subscriptions:

$ rake resquebus:subscribe

The subscription block is run inside a Resque worker which needs to be started for each app.

$ rake resquebus:setup resque:work

The incoming queue also needs to be processed on a dedicated or all the app servers.

$ rake resquebus:driver resque:work

If you want retry to work for subscribing apps, you should run resque-scheduler

$ rake resque:scheduler

Heartbeat

We've found it useful to have the bus act like cron, triggering timed jobs throughout the system. Resque Bus calls this a heartbeat. It uses resque-scheduler to trigger the events. You can enable it in your Rakefile.

# resque.rake
namespace :resque do
  task :setup => [:environment] do
    ResqueBus.heartbeat!
  end
end

Or add it to your schedule.yml directly

resquebus_heartbeat:
  cron: "* * * * *"
  class: "::ResqueBus::Heartbeat"
  queue: resquebus_incoming
  description: "I publish a heartbeat_minutes event every minute"

It is the equivalent of doing this every minute

seconds = minutes * (60) hours = minutes / (60) days = minutes / (60*24)

now = Time.at(seconds)

attributes = {}

now = Time.now
seconds = now.to_i
ResqueBus.publish("hearbeat_minutes", {
  "epoch_seconds" => seconds,
  "epoch_minutes" => seconds / 1.minute,
  "epoch_hours"   => seconds / 1.hour,
  "epoch_days"    => seconds / 1.day,
  "minute"        => now.min
  "hour"          => now.hour
  "day"           => now.day
  "month"         => now.month
  "year"          => now.year
  "yday"          => now.yday
  "wday"          => now.wday
})

This allows you do something like this:

ResqueBus.dispatch("app_c") do
  # runs at 10:20, 11:20, etc
  subscribe "once_an_hour", 'bus_event_type' => 'heartbeat_minutes', 'minute' => 20 do |attributes|
    Sitemap.generate!
  end
  
  # runs every five minutes
  subscribe "every_five_minutes", 'bus_event_type' => 'heartbeat_minutes' do |attributes|
    next unless attributes["epoch_minutes"] % 5 == 0
    HealthCheck.run!
  end
  
  # runs at 8am on the first of every month
  subscribe "new_month_morning", 'bus_event_type' => 'heartbeat_minutes', 'day' => 1, hour' => 8, 'minute' => 0,  do |attributes|
    next unless attributes["epoch_minutes"] % 5 == 0
    Token.old.expire!
  end
end

Compatibility

ResqueBus can live along side another instance of Resque that points at a different Redis server.

# config
Resque.redis = "192.168.1.0:6379"
ResqueBus.redis = "192.168.1.1:6379"

If no Redis instance is given specifically, ResqueBus will use the Resque one.

# config
Resque.redis = "192.168.1.0:6379"

That will use the default (resque) namespace which can be helpful for using the tooling. Conflict with queue names are unlikely. You can change the namespace if you like though.

# config
Resque.redis = "192.168.1.0:6379"
ResqusBus.redis.namespace = :get_on_the_bus

Local Mode

For development, a local mode is also provided and is specified in the configuration.

# config
ResqueBus.local_mode = :standalone
or 
ResqueBus.local_mode = :inline

Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events

Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block.

TODO

  • There are a few spots in the code with TODO notes
  • Make this not freak out in development without Redis or when Redis is down
  • We might not actually need to publish in tests
  • Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines
  • Allow calling resquebus:setup and resquebus:driver together (append to ENV['QUEUES'], don't replace it)

Copyright (c) 2011 Brian Leonard, released under the MIT license

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Ruby 99.9%
  • Shell 0.1%