Skip to content

dunse/node-resque

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

52 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

node-resque

Delayed Tasks in nodejs. A very opinionated but compatible API with resque and resque scheduler

Nodei stats

Build Status

Usage

I learn best by examples:

/////////////////////////
// REQUIRE THE PACKAGE //
/////////////////////////

var NR = require("node-resque");

///////////////////////////
// SET UP THE CONNECTION //
///////////////////////////

var connectionDetails = {
  host:      "127.0.0.1",
  password:  "",
  port:      6379,
  database:  0,
}

//////////////////////////////
// DEFINE YOUR WORKER TASKS //
//////////////////////////////

var jobs = {
  "add": {
    perform: function(a,b,callback){
      var answer = a + b; 
      callback(answer);
    },
  },
  "subtract": {
    perform: function(a,b,callback){
      var answer = a - b; 
      callback(answer);
    },
  },
};

////////////////////
// START A WORKER //
////////////////////

var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs, function(){
  worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers
  worker.start();
});

///////////////////////
// START A SCHEDULER //
///////////////////////

var scheduler = new NR.scheduler({connection: connectionDetails}, function(){
  scheduler.start();
});

/////////////////////////
// REGESTER FOR EVENTS //
/////////////////////////

worker.on('start',           function(){ console.log("worker started"); })
worker.on('end',             function(){ console.log("worker ended"); })
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); })
worker.on('poll',            function(queue){ console.log("worker polling " + queue); })
worker.on('job',             function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); })
worker.on('reEnqueue',       function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
worker.on('success',         function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
worker.on('error',           function(queue, job, error){ console.log("job failed " + queue + " " + JSON.stringify(job) + " >> " + error); })
worker.on('pause',           function(){ console.log("worker paused"); })

scheduler.on('start',             function(){ console.log("scheduler started"); })
scheduler.on('end',               function(){ console.log("scheduler ended"); })
scheduler.on('poll',              function(){ console.log("scheduler polling"); })
scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })
scheduler.on('transferred_job',    function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); })

////////////////////////
// CONNECT TO A QUEUE //
////////////////////////

var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
  queue.enqueue('math', "add", [1,2]);
  queue.enqueue('math', "add", [2,3]);
  queue.enqueueIn(3000, 'math', "subtract", [2,1]);
});

Configutation Options:

new queue requires only the "queue" variable to be set. You can also pass the jobs hash to it.

new worker has some additonal options:

options = {
  looping: true,
  timeout: 5000,
  queues:  "*",
  name:    os.hostname() + ":" + process.pid
}

The configuration hash passed to new worker, new scheduler or new queue can also take a connection option.

var connectionDetails = {
  host:      "127.0.0.1",
  password:  "",
  port:      6379,
  database:  0,
  namespace: "resque",
}

var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
  worker.start();
});

Notes

  • Be sure to call worker.end() before shutting down your application if you want to properly clear your worker status from resque
  • When ending your application, be sure to allow your workers time to finsih what they are working on
  • worker.workerCleanup() only works for *nix operating systems (osx, unix, solaris, etc)
  • If you are using any plugins which effect beforeEnqueue or afterEnqueue, be sure to pass the jobs argument to the new Queue constructor
  • If you plan to run more than one worker per nodejs process, be sure to name them something distinct. Names must follow the patern hostname:pid+unique_id. For example:
var name = os.hostname() + ":" + process.pid() + "+" + counter;
var worker = new NR.worker({connection: connectionDetails, queues: 'math', 'name' : name}, jobs);

Queue Managment

Additonal methods provided on the queue object:

  • queue.prototype.queues = function(callback)
    • callback(error, array_of_queues)
  • queue.prototype.length = function(q, callback)
    • callback(error, number_of_elements_in_queue)
  • queue.prototype.del = function(q, func, args, count, callback)
    • callback(error, number_of_items_deleted)
  • queue.prototype.delDelayed = function(q, func, args, callback)
    • callback(error, timestamps_the_job_was_removed_from)
  • queue.prototype.scheduledAt = function(q, func, args, callback)
    • callback(error, timestamps_the_job_is_scheduled_for)

Plugins

TODO: have a way to load plugins so they don't need to be in this package

Just like ruby's resque, you can write worker plugins. They look look like this. The 4 hooks you have are before_enqueue, after_enqueue, before_perform, and after_perform

var myPlugin = function(worker, func, queue, job, args, options){
  var self = this;
  self.worker = worker;
  self.queue = queue;
  self.func = func;
  self.job = job;
  self.args = args;
  self.options = options;
}

////////////////////
// PLUGIN METHODS //
////////////////////

myPlugin.prototype.before_enqueue = function(callback){
  // console.log("** before_enqueue")
  callback(null, true);
}

myPlugin.prototype.after_enqueue = function(callback){
  // console.log("** after_enqueue")
  callback(null, true);
}

myPlugin.prototype.before_perform = function(callback){
  // console.log("** before_perform")
  callback(null, true);
}

myPlugin.prototype.after_perform = function(callback){
  // console.log("** after_perform")
  callback(null, true);
}

And then your plugin can be invoked within a job like this:

var jobs = {
  "add": {
    plugins: [ 'myPlugin' ],
    pluginOptions: {
      myPlugin: { thing: 'stuff' },
    },
    perform: function(a,b,callback){
      var answer = a + b; 
      callback(answer);
    },
  },
}

notes

  • All plugins which return (error, toRun). if toRun = false on beforeEnqueue, the job beign inqueued will be thrown away, and if toRun = false on beforePerfporm, the job will be reEnqued and not run at this time. However, it doesn't really matter what toRun returns on the after hooks.

Acknowledgments

Most of this code was inspired by / stolen from coffee-resque and coffee-resque-scheduler. Thanks!

About

It's Resque... In node!

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published