Skip to content

Commit

Permalink
Use multi transactions to increase saftey with delayed jobs (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Feb 15, 2020
1 parent 74132d3 commit e775f20
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
2 changes: 1 addition & 1 deletion __tests__/core/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ describe("worker", () => {
});
});

// TODO: Typescript seems to have troble with frozen objects
// TODO: Typescript seems to have trouble with frozen objects
// test('job arguments are immutable', async (done) => {
// await queue.enqueue(specHelper.queue, 'messWithData', { a: 'starting value' })

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "node-resque",
"description": "an opinionated implementation of resque in node",
"license": "Apache-2.0",
"version": "6.0.7",
"version": "6.0.8",
"homepage": "https://github.com/actionhero/node-resque",
"repository": {
"type": "git",
Expand Down
31 changes: 16 additions & 15 deletions src/core/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,16 @@ export class Queue extends EventEmitter {
throw new Error("Job already enqueued at this time with same arguments");
}

await this.connection.redis.rpush(
this.connection.key("delayed:" + rTimestamp),
item
);
await this.connection.redis.sadd(
this.connection.key("timestamps:" + item),
"delayed:" + rTimestamp
);
await this.connection.redis.zadd(
this.connection.key("delayed_queue_schedule"),
rTimestamp.toString(),
rTimestamp.toString()
);
await this.connection.redis
.multi()
.rpush(this.connection.key("delayed:" + rTimestamp), item)
.sadd(this.connection.key("timestamps:" + item), "delayed:" + rTimestamp)
.zadd(
this.connection.key("delayed_queue_schedule"),
rTimestamp.toString(),
rTimestamp.toString()
)
.exec();
}
/**
* - In ms, the number of ms to delay before this job is able to start being worked on.
Expand Down Expand Up @@ -139,8 +136,12 @@ export class Queue extends EventEmitter {
* - delete a queue, and all jobs in that queue.
*/
async delQueue(q: string) {
await this.connection.redis.del(this.connection.key("queue", q));
await this.connection.redis.srem(this.connection.key("queues"), q);
const { redis } = this.connection;
await redis
.multi()
.del(this.connection.key("queue", q))
.srem(this.connection.key("queues"), q)
.exec();
}

/**
Expand Down
12 changes: 7 additions & 5 deletions src/core/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,16 @@ export class Scheduler extends EventEmitter {

private async cleanupTimestamp(timestamp: number) {
const key = this.connection.key("delayed:" + timestamp);
await this.connection.redis.watch(key);
const length = await this.connection.redis.llen(key);
if (length === 0) {
await this.connection.redis.del(key);
await this.connection.redis.zrem(
this.connection.key("delayed_queue_schedule"),
timestamp
);
await this.connection.redis
.multi()
.del(key)
.zrem(this.connection.key("delayed_queue_schedule"), timestamp)
.exec();
}
await this.connection.redis.unwatch();
}

private async checkStuckWorkers() {
Expand Down

0 comments on commit e775f20

Please sign in to comment.