Skip to content

Commit

Permalink
Make a layer of abstraction for reserving jobs.
Browse files Browse the repository at this point in the history
This will make it easier to support reserving more than one job.
  • Loading branch information
Keith Rarick committed Oct 1, 2007
1 parent a0be436 commit b9270f3
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 18 deletions.
22 changes: 10 additions & 12 deletions beanstalkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ dispatch_cmd(conn c)
conn_set_worker(c);

/* does this conn already have a job reserved? */
if (c->reserved_job) return reply_job(c, c->reserved_job, MSG_RESERVED);
if (has_reserved_job(c)) return reply_job(c, c->reserved_job, MSG_RESERVED);

/* try to get a new job for this guy */
wait_for_job(c);
Expand All @@ -268,14 +268,12 @@ dispatch_cmd(conn c)

delete_ct++; /* stats */

if (!c->reserved_job || id != c->reserved_job->id) {
reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
break;
}
j = remove_reserved_job(c, id);

if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);

free(c->reserved_job);
c->reserved_job = NULL;
conn_remove(c); /* remove it from the running_list */
free(j);
if (!has_reserved_job(c)) conn_remove(c); /* remove from running_list */

reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
break;
Expand Down Expand Up @@ -329,7 +327,7 @@ reset_conn(conn c)
if (r == -1) return warn("update events failed"), conn_close(c);

/* was this a peek or stats command? */
if (c->out_job != c->reserved_job) free(c->out_job);
if (!has_reserved_this_job(c, c->out_job)) free(c->out_job);
c->out_job = NULL;

c->reply_sent = 0; /* now that we're done, reset this */
Expand Down Expand Up @@ -431,14 +429,14 @@ h_conn_data(conn c)
static void
h_conn_timeout(conn c)
{
job j = c->reserved_job;
job j = soonest_job(c);

if (!j) return;

timeout_ct++; /* stats */
enqueue_job(j);
c->reserved_job = NULL;
conn_remove(c); /* remove it from the running_list */
job_remove(c, j);
if (!has_reserved_job(c)) conn_remove(c); /* remove from running_list */
}

#define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
Expand Down
13 changes: 8 additions & 5 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ conn_set_evq(conn c, const int events, evh handler)

event_set(&c->evq, c->fd, events, handler, c);

if (c->reserved_job) tv.tv_sec = c->reserved_job->deadline - time(NULL);
if (has_reserved_job(c)) tv.tv_sec = soonest_job(c)->deadline - time(NULL);

r = event_add(&c->evq, c->reserved_job ? &tv : NULL);
r = event_add(&c->evq, has_reserved_job(c) ? &tv : NULL);
if (r == -1) return -1;

return 0;
Expand Down Expand Up @@ -165,10 +165,13 @@ conn_close(conn c)

close(c->fd);

if (c->reserved_job) enqueue_job(c->reserved_job);
free(c->in_job);
if (c->out_job != c->reserved_job) free(c->out_job); /* peek command? */
c->in_job = c->out_job = c->reserved_job = NULL;

/* was this a peek or stats command? */
if (!has_reserved_this_job(c, c->out_job)) free(c->out_job);

if (has_reserved_job(c)) enqueue_reserved_jobs(c);
c->in_job = c->out_job = NULL;


if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
Expand Down
47 changes: 46 additions & 1 deletion prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,56 @@ static void
reserve_job(conn c, job j)
{
j->deadline = time(NULL) + RESERVATION_TIMEOUT;
if (!has_reserved_job(c)) conn_insert(&running_list, c);
c->reserved_job = j;
conn_insert(&running_list, c);
return reply_job(c, j, MSG_RESERVED);
}

int
has_reserved_job(conn c)
{
return !!c->reserved_job;
}

/* return the reserved job with the earliest deadline,
* or NULL if there's no reserved job */
job
soonest_job(conn c)
{
return c->reserved_job;
}

void
enqueue_reserved_jobs(conn c)
{
enqueue_job(c->reserved_job);
c->reserved_job = NULL;
}

int
has_reserved_this_job(conn c, job j)
{
return c->reserved_job == j;
}

job
remove_reserved_job(conn c, unsigned long long int id)
{
job j;

if (!c->reserved_job) return NULL;
if (id != c->reserved_job->id) return NULL;
j = c->reserved_job;
c->reserved_job = NULL;
return j;
}

void
job_remove(conn c, job j)
{
if (c->reserved_job == j) c->reserved_job = NULL;
}

int
count_reserved_jobs()
{
Expand Down
7 changes: 7 additions & 0 deletions prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ void process_queue();

job peek_job(unsigned long long int id);

int has_reserved_job(conn c);
job soonest_job(conn c);
void enqueue_reserved_jobs(conn c);
int has_reserved_this_job(conn c, job j);
job remove_reserved_job(conn c, unsigned long long int id);
void job_remove(conn c, job j);

int count_reserved_jobs();
unsigned int count_ready_jobs();

Expand Down

0 comments on commit b9270f3

Please sign in to comment.