Skip to content

Commit

Permalink
Bug beanstalkd#648 - reserve more than one job at a time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Oct 1, 2007
1 parent bd8e98e commit 99add0f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 37 deletions.
18 changes: 9 additions & 9 deletions beanstalkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ dispatch_cmd(conn c)
reserve_ct++; /* stats */
conn_set_worker(c);

/* does this conn already have a job reserved? */
if (has_reserved_job(c)) return reply_job(c, c->reserved_job, MSG_RESERVED);

/* try to get a new job for this guy */
wait_for_job(c);
process_queue();
Expand Down Expand Up @@ -429,13 +426,16 @@ h_conn_data(conn c)
static void
h_conn_timeout(conn c)
{
job j = soonest_job(c);

if (!j) return;
int r;
job j;

timeout_ct++; /* stats */
enqueue_job(j);
remove_this_reserved_job(c, j);
while ((j = soonest_job(c))) {
if (j->deadline > time(NULL)) return;
timeout_ct++; /* stats */
enqueue_job(remove_this_reserved_job(c, j));
r = conn_update_evq(c, c->evq.ev_events);
if (r == -1) return warn("conn_update_evq() failed"), conn_close(c);
}
}

#define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
Expand Down
7 changes: 4 additions & 3 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ conn_free(conn c)
conn
make_conn(int fd, char start_state)
{
job j;
conn c;

c = conn_alloc();
Expand All @@ -54,9 +55,11 @@ make_conn(int fd, char start_state)
c->state = start_state;
c->type = 0;
c->cmd_read = 0;
c->in_job = c->out_job = c->reserved_job = NULL;
c->in_job = c->out_job = NULL;
c->in_job_read = c->out_job_sent = 0;
c->prev = c->next = c; /* must be out of a linked list right now */
j = &c->reserved_jobs;
j->prev = j->next = j;

cur_conn_ct++; /* stats */

Expand Down Expand Up @@ -120,8 +123,6 @@ conn_update_evq(conn c, const int events)

if (!c) return -1;

if (c->evq.ev_events == events) return 0;

/* If it's been added, try to delete it first */
if (c->evq.ev_base) {
r = event_del(&c->evq);
Expand Down
2 changes: 1 addition & 1 deletion conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct conn {
int in_job_read;
job out_job;
int out_job_sent;
job reserved_job;
struct job reserved_jobs; /* doubly-linked list header */
};

void conn_init();
Expand Down
33 changes: 33 additions & 0 deletions job.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ allocate_job(int body_size)
if (!j) return warn("OOM"), NULL;

j->body_size = body_size;
j->next = j->prev = j; /* not in a linked list */

return j;
}
Expand Down Expand Up @@ -64,3 +65,35 @@ job_cmp(job a, job b)
}
return a->pri - b->pri;
}

int
job_list_any_p(job head)
{
return head->next != head || head->prev != head;
}

job
job_remove(job j)
{
if (!j) return NULL;
if (!job_list_any_p(j)) return NULL; /* not in a doubly-linked list */

j->next->prev = j->prev;
j->prev->next = j->next;

j->prev = j->next = j;

return j;
}

void
job_insert(job head, job j)
{
if (job_list_any_p(j)) return; /* already in a linked list */

j->prev = head->prev;
j->next = head;
head->prev->next = j;
head->prev = j;
}

11 changes: 9 additions & 2 deletions job.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

#include <time.h>

typedef struct job {
typedef struct job *job;

struct job {
job prev, next; /* linked list of jobs */
unsigned long long int id;
unsigned int pri;
int body_size;
time_t deadline;
char body[];
} *job;
};

job allocate_job(int body_size);
job make_job(unsigned int pri, int body_size);
Expand All @@ -20,4 +23,8 @@ int job_cmp(job a, job b);

job job_copy(job j);

int job_list_any_p(job head);
job job_remove(job j);
void job_insert(job head, job j);

#endif /*job_h*/
58 changes: 37 additions & 21 deletions reserve.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* reserve.c - job reservations */

#include "job.h"
#include "prot.h"
#include "reserve.h"

Expand All @@ -10,58 +11,73 @@ reserve_job(conn c, job j)
{
j->deadline = time(NULL) + RESERVATION_TIMEOUT;
cur_reserved_ct++; /* stats */
c->reserved_job = j;
job_insert(&c->reserved_jobs, j);
return reply_job(c, j, MSG_RESERVED);
}

int
has_reserved_job(conn c)
{
return !!c->reserved_job;
return job_list_any_p(&c->reserved_jobs);
}

/* return the reserved job with the earliest deadline,
* or NULL if there's no reserved job */
job
soonest_job(conn c)
{
return c->reserved_job;
job j, soonest = NULL;

for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (j->deadline <= (soonest ? : j)->deadline) soonest = j;
}
return soonest;
}

void
enqueue_reserved_jobs(conn c)
{
enqueue_job(c->reserved_job);
cur_reserved_ct--;
c->reserved_job = NULL;
while (job_list_any_p(&c->reserved_jobs)) {
enqueue_job(job_remove(c->reserved_jobs.next));
cur_reserved_ct--;
}
}

int
has_reserved_this_job(conn c, job j)
has_reserved_this_job(conn c, job needle)
{
return c->reserved_job == j;
job j;

for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (needle == j) return 1;
}
return 0;
}

job
remove_reserved_job(conn c, unsigned long long int id)
static job
find_reserved_job(conn c, unsigned long long int id)
{
job j;

if (!c->reserved_job) return NULL;
if (id != c->reserved_job->id) return NULL;
j = c->reserved_job;
cur_reserved_ct--;
c->reserved_job = NULL;
return j;
for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (j->id == id) return j;
}
return NULL;
}

void
job
remove_reserved_job(conn c, unsigned long long int id)
{
return remove_this_reserved_job(c, find_reserved_job(c, id));
}

/* j can be NULL */
job
remove_this_reserved_job(conn c, job j)
{
if (c->reserved_job == j) {
cur_reserved_ct--;
c->reserved_job = NULL;
}
j = job_remove(j);
if (j) cur_reserved_ct--;
return j;
}

unsigned int
Expand Down
2 changes: 1 addition & 1 deletion reserve.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ job soonest_job(conn c);
void enqueue_reserved_jobs(conn c);
int has_reserved_this_job(conn c, job j);
job remove_reserved_job(conn c, unsigned long long int id);
void remove_this_reserved_job(conn c, job j);
job remove_this_reserved_job(conn c, job j);

unsigned int count_reserved_jobs();

Expand Down

0 comments on commit 99add0f

Please sign in to comment.