Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge of high priority issue fixes and pull requests on top of version 0.6.2 #256

Merged
merged 48 commits into from
Jan 25, 2014
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
3e84143
add limit parameter to Queue.promote
behrad Oct 18, 2013
074866d
Merge pull request #3 from behrad/parameterize-job-promotion-limit
behrad Oct 18, 2013
06cc220
Merge with
behrad Oct 18, 2013
ef3ab15
Merge with aventuralabs:master
behrad Oct 18, 2013
14d7bda
Merge remote-tracking branch 'origin/search_key_fixes'
behrad Oct 18, 2013
868d67d
Revert "Merge with"
behrad Oct 18, 2013
2733c1c
Merge with aventuralabs:master
behrad Oct 18, 2013
b055d11
Revert "Merge with"
behrad Oct 18, 2013
3e4b667
Revert "Revert "Merge with""
behrad Oct 18, 2013
2ad6520
Merge remote-tracking branch 'origin/search_key_fixes'
behrad Oct 18, 2013
8e3a73a
Search index fixes
behrad Oct 18, 2013
d58a0cd
Revert "add limit parameter to Queue.promote"
behrad Oct 19, 2013
a4eaa80
Merge branch 'master' of https://github.com/behrad/kue
behrad Oct 19, 2013
2e98482
stable merge
behrad Oct 19, 2013
977558c
Disable search indexes
behrad Oct 19, 2013
85ca071
Worker job processing pause/resume support
behrad Oct 28, 2013
fe0162c
Worker job processing pause/resume fix
behrad Oct 28, 2013
245e2d6
Disable search indexing (indexes leakage on job removal)
behrad Oct 28, 2013
c3748be
Shutdown support for specific worker types
behrad Oct 28, 2013
287f75b
Shutdown race fixes
behrad Nov 27, 2013
e7e0e97
Shutdown race fixes
behrad Nov 27, 2013
5c27ca4
Shutdown race fixes
behrad Nov 27, 2013
b3035c0
Job Force Shutdown Action
behrad Jan 18, 2014
c8d7618
test suport
behrad Jan 20, 2014
ae37b6f
fix docs
behrad Jan 20, 2014
1113138
test support
behrad Jan 20, 2014
04a4fb5
bump to 0.7.0
behrad Jan 20, 2014
d06a88e
bump to 0.7.0
behrad Jan 20, 2014
2d76534
bump to 0.7.0
behrad Jan 20, 2014
5c43477
bump to 0.7.0
behrad Jan 20, 2014
356a314
bump to 0.7.0
behrad Jan 20, 2014
fc92351
bump to 0.7.0
behrad Jan 20, 2014
2503572
first test
behrad Jan 20, 2014
7df4bc8
fix this context
behrad Jan 20, 2014
a0f7aad
add redis support
behrad Jan 20, 2014
b9d830e
fix typo
behrad Jan 20, 2014
6069a42
bump to 0.7.0
behrad Jan 20, 2014
50f033b
bump to 0.7.0
behrad Jan 22, 2014
5aba5f8
fix job failed event on first attempt
behrad Jan 22, 2014
9f168c0
bump to 0.7.0
behrad Jan 22, 2014
2241c8a
bump to 0.7.0
behrad Jan 22, 2014
052fee9
fix redirection
behrad Jan 22, 2014
4c9eb6b
bump to 0.7.0
behrad Jan 22, 2014
b1c2d3d
bump to 0.7.0
behrad Jan 22, 2014
cdaa2f7
bump to 0.7.0
behrad Jan 22, 2014
ec977c6
bump to 0.7.0
behrad Jan 22, 2014
5097b1d
update docs
behrad Jan 25, 2014
f440b7b
bump to 0.7.0
behrad Jan 25, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.idea
.DS_Store
node_modules
*.sock
testing
lib/http/public/stylesheets/main.css
*.rdb
test/incomplete
179 changes: 172 additions & 7 deletions lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ var EventEmitter = require('events').EventEmitter
, Worker = require('./queue/worker')
, events = require('./queue/events')
, Job = require('./queue/job')
, redis = require('./redis');
, redis = require('./redis')
, reds = require('reds')
, async = require('async');


/**
* Expose `Queue`.
Expand Down Expand Up @@ -55,6 +58,19 @@ Object.defineProperty(exports, 'app', {

exports.redis = redis;


/**
* Search instance.
*/

var search;
function getSearch() {
if (search) return search;
reds.createClient = require('./redis').createClient;
return search = reds.createSearch('q:search');
};


/**
* Create a new `Queue`.
*
Expand Down Expand Up @@ -98,8 +114,8 @@ Queue.prototype.__proto__ = EventEmitter.prototype;
*/

Queue.prototype.create =
Queue.prototype.createJob = function(type, data){
return new Job(type, data);
Queue.prototype.createJob = function(type, data, options){
return new Job(type, data, options);
};

/**
Expand Down Expand Up @@ -205,6 +221,39 @@ Queue.prototype.process = function(type, n, fn){
}
};


/**
* Initializes garbage collection for jobs of `type` after `n` milliseconds.
* Currently, there is a 1000 millisecond minimum (this should not be primary form of removal)
*
* @param {String} type
* @param {Number} n
* @api public
*/

Queue.prototype.setExpiration = function(type, n){
var self = this,
client = this.client,
n = Math.max(n, 1000);


setInterval(function() {

var cutoff = Math.floor((new Date().getTime() - n) / 1000);

self.client.zrangebyscore('q:expiring:' + type, 0, cutoff, function(err, results) {

if (!err && results.length)
for (var i = 0; i < results.length; ++i)
Job.remove(results[i]);

});
}, n);

return this;
};


/**
* Graceful shutdown
*
Expand Down Expand Up @@ -248,6 +297,84 @@ Queue.prototype.types = function(fn){
return this;
};


/**
* Enables job retrieval by id
*
* @param {Number} id
* @param {Function} fn
* @api public
*/
Queue.prototype.get =
Queue.prototype.getById = function(id, fn){
Job.get(id, fn);
};


/**
* Determines whether a job is outstanding. If input is number,
* taken to be a Job ID. If a string, taken to be a type + key. If an object
* the key should be either id or key
*
* @param {Number} args or {Object}
* @param {Function} fn
* @api public
*/

Queue.prototype.isOutstanding = function(args, fn){
var isKey = false,
val = null;

if ("object" == typeof args) {
if ("undefined" != typeof args.id) {
val = args.id;
} else if ("undefined" != typeof args.key && "undefined" != typeof args.type) {
isKey = true;
} else {
return fn(new Error("No key or id specified for outstanding check."))
}
} else if ("number" == typeof args) {
val = args;
} else {
return fn(new Error("No key or id specified for outstanding check."))
}

if (isKey)
this.client.sismember("q:outstanding:" + args.type, args.key, function(err, isMember) {
if (err)
return fn(err, null);
else
return fn(null, 1 == isMember);
});
else {
this.get(val, function(err, job) {
if (err)
return fn(err, false);
else {
var state = job.state();
return fn(err, ('active' == state || 'inactive' == state));
}
});
}
};


/**
* Find jobs by a series of identifiers and callback `fn(err)`.
*
* @param {String} lookup
* @param {Function} fn
* @api public
*/

Queue.prototype.find = function(query, fn){
getSearch().query(query).end(function(err, ids){
fn(err, ids);
}, 'and');
};



/**
* Return job ids with the given `state`, and callback `fn(err, ids)`.
*
Expand All @@ -257,11 +384,23 @@ Queue.prototype.types = function(fn){
* @api public
*/

Queue.prototype.state = function(state, fn){
this.client.zrange('q:jobs:' + state, 0, -1, fn);
Queue.prototype.state =
Queue.prototype.states = function(states, fn){
if ('string' == typeof states)
this.client.zrange('q:jobs:' + states, 0, -1, fn);
else {
var self = this;
function getByState(state, cb) { self.client.zrange('q:jobs:' + state, 0, -1, cb); }
async.map(states, getByState, function(err, ids) {
self = null;
fn(err, Array.prototype.concat.apply([], ids));
});
}
return this;
};



/**
* Get queue work time in milliseconds and invoke `fn(err, ms)`.
*
Expand All @@ -287,8 +426,18 @@ Queue.prototype.workTime = function(fn){
* @api public
*/

Queue.prototype.card = function(state, fn){
this.client.zcard('q:jobs:' + state, fn);
Queue.prototype.card =
Queue.prototype.cards = function(states, fn){
if ('string' == typeof states)
this.client.zcard('q:jobs:' + states, fn);
else {
var self = this;
function countByState(state, cb) { self.client.zcard('q:jobs:' + state, cb); }
async.map(states, countByState, function(err, counts) {
self = null;
fn(err, counts.reduce(function(memo, num) { return memo + num; }, 0));
});
}
return this;
};

Expand Down Expand Up @@ -324,6 +473,14 @@ Queue.prototype.active = function(fn){
return this.state('active', fn);
};

/**
* Oustanding jobs (inactive or active).
*/

Queue.prototype.outstanding = function(fn){
return this.state(['active','inactive'], fn);
};

/**
* Completed jobs count.
*/
Expand Down Expand Up @@ -356,6 +513,14 @@ Queue.prototype.activeCount = function(fn){
return this.card('active', fn);
};

/**
* Outstanding jobs (active or inactive).
*/

Queue.prototype.outstandingCount = function(fn){
return this.card(['active','inactive'], fn);
};

/**
* Delayed jobs.
*/
Expand Down
28 changes: 18 additions & 10 deletions lib/queue/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,20 @@ exports.onMessage = function(channel, msg){
var msg = JSON.parse(msg);

// map to Job when in-process
var job = exports.jobs[msg.id];
if (job) {
job.emit.apply(job, msg.args);
if (['complete', 'failed'].indexOf(msg.event) !== -1) exports.remove(job);
if ('indexed' == msg.events) {
this.emit(msg.id, 'indexed');
} else {
var job = exports.jobs[msg.id];
if (job) {
job.emit.apply(job, msg.args);
if (['complete', 'failed'].indexOf(msg.event) !== -1) exports.remove(job);
}

// emit args on Queues
msg.args[0] = 'job ' + msg.args[0];
msg.args.push(msg.id);
exports.queue.emit.apply(exports.queue, msg.args);
}

// emit args on Queues
msg.args[0] = 'job ' + msg.args[0];
msg.args.push(msg.id);
exports.queue.emit.apply(exports.queue, msg.args);
};

/**
Expand All @@ -102,5 +106,9 @@ exports.emit = function(id, event) {
, event: event
, args: [].slice.call(arguments, 1)
});
client.publish(exports.key, msg);

if ('indexed' == event)
client.publish('q:indexing', msg);
else
client.publish(exports.key, msg);
};
Loading