Skip to content

Commit

Permalink
Per-job timeouts. (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Dec 18, 2007
1 parent 2b6ea26 commit f568f77
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 23 deletions.
10 changes: 8 additions & 2 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Producer Command
The "put" command is for any process that wants to insert a job into the queue.
It comprises a command line followed by the job body:

put <pri> <delay> <bytes>\r\n
put <pri> <delay> <ttr> <bytes>\r\n
<data>\r\n

It inserts a job into the queue.
Expand All @@ -89,6 +89,11 @@ It inserts a job into the queue.

- <delay> is an integer number of seconds to wait before putting the job in
the ready queue. The job will be in the "delayed" state during this time.

- <ttr> -- time to run -- is an integer number of seconds to allow a worker
to run this job. This time is counted from the moment a worker reserves
this job. If the worker does not delete, release, or bury the job within
<ttr> seconds, the job will time out and the server will release the job.

- <bytes> is an integer (currently must be < 2**16) indicating the size of
the job body, not including the trailing "\r\n".
Expand All @@ -115,8 +120,9 @@ reserve\r\n

This will return a newly-reserved job. If no job is available to be reserved,
beanstalkd will wait to send a response until one becomes available. Once a job
is reserved for the client, the client has 120 seconds to run the job before
is reserved for the client, the client has limited time to run the job before
the job times out, when the server will put the job back into the ready queue.
The time available can be found by asking the server for the job's stats.

There is only one possible response in the form of a text line followed by the
job body:
Expand Down
3 changes: 2 additions & 1 deletion job.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ allocate_job(int body_size)
}

job
make_job(unsigned int pri, unsigned int delay, int body_size)
make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size)
{
job j;

Expand All @@ -52,6 +52,7 @@ make_job(unsigned int pri, unsigned int delay, int body_size)
j->id = next_id++;
j->pri = pri;
j->delay = delay;
j->ttr = ttr;

return j;
}
Expand Down
4 changes: 3 additions & 1 deletion job.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct job {
unsigned long long int id;
unsigned int pri;
unsigned int delay;
unsigned int ttr;
int body_size;
time_t creation;
time_t deadline;
Expand All @@ -46,7 +47,8 @@ struct job {
};

job allocate_job(int body_size);
job make_job(unsigned int pri, unsigned int delay, int body_size);
job make_job(unsigned int pri, unsigned int delay, unsigned int ttr,
int body_size);

typedef int(*job_cmp_fn)(job, job);
int job_pri_cmp(job a, job b);
Expand Down
20 changes: 16 additions & 4 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ read_delay(unsigned int *delay, const char *buf, char **end)
return read_pri(delay, buf, end);
}

/* Read a timeout value from the given buffer and place it in ttr.
* The interface and behavior are the same as in read_pri(). */
static int
read_ttr(unsigned int *ttr, const char *buf, char **end)
{
return read_pri(ttr, buf, end);
}

static void
wait_for_job(conn c)
{
Expand Down Expand Up @@ -540,6 +548,7 @@ fmt_job_stats(char *buf, size_t size, void *jp)
job_state(j),
(unsigned int) (t - j->creation),
j->delay,
j->ttr,
(unsigned int) (j->deadline - t),
j->timeout_ct,
j->release_ct,
Expand All @@ -566,8 +575,8 @@ dispatch_cmd(conn c)
unsigned int count;
job j;
char type;
char *size_buf, *delay_buf, *pri_buf, *end_buf;
unsigned int pri, delay, body_size;
char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf;
unsigned int pri, delay, ttr, body_size;
unsigned long long int id;

/* NUL-terminate this string so we can use strtol and friends */
Expand All @@ -586,7 +595,10 @@ dispatch_cmd(conn c)
r = read_pri(&pri, c->cmd + 4, &delay_buf);
if (r) return conn_close(c);

r = read_delay(&delay, delay_buf, &size_buf);
r = read_delay(&delay, delay_buf, &ttr_buf);
if (r) return conn_close(c);

r = read_ttr(&ttr, ttr_buf, &size_buf);
if (r) return conn_close(c);

errno = 0;
Expand All @@ -600,7 +612,7 @@ dispatch_cmd(conn c)

conn_set_producer(c);

c->in_job = make_job(pri, delay, body_size + 2);
c->in_job = make_job(pri, delay, ttr, body_size + 2);

fill_extra_data(c);

Expand Down
4 changes: 1 addition & 3 deletions prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@

#define URGENT_THRESHOLD 1024

/* measured in seconds */
#define RESERVATION_TIMEOUT 120

#define MSG_RESERVED "RESERVED"

#define CMD_PUT "put "
Expand Down Expand Up @@ -101,6 +98,7 @@
"state: %s\n" \
"age: %u\n" \
"delay: %u\n" \
"ttr: %u\n" \
"time-left: %u\n" \
"timeouts: %u\n" \
"releases: %u\n" \
Expand Down
2 changes: 1 addition & 1 deletion reserve.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static struct conn running = { &running, &running, 0 };
void
reserve_job(conn c, job j)
{
j->deadline = time(NULL) + RESERVATION_TIMEOUT;
j->deadline = time(NULL) + j->ttr;
cur_reserved_ct++; /* stats */
conn_insert(&running, c);
j->state = JOB_STATE_RESERVED;
Expand Down
10 changes: 5 additions & 5 deletions tests/test_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ __CUT__job_test_creation()
{
job j;

j = make_job(1, 0, 0);
j = make_job(1, 0, 1, 0);
ASSERT(j->pri == 1, "priority should match");
}

Expand All @@ -23,8 +23,8 @@ __CUT__job_test_cmp_pris()
{
job a, b;

a = make_job(1, 0, 0);
b = make_job(1 << 27, 0, 0);
a = make_job(1, 0, 1, 0);
b = make_job(1 << 27, 0, 1, 0);

ASSERT(job_pri_cmp(a, b) < 0, "should be a < b");
}
Expand All @@ -34,8 +34,8 @@ __CUT__job_test_cmp_ids()
{
job a, b;

a = make_job(1, 0, 0);
b = make_job(1, 0, 0);
a = make_job(1, 0, 1, 0);
b = make_job(1, 0, 1, 0);

b->id <<= 49;
ASSERT(job_pri_cmp(a, b) < 0, "should be a < b");
Expand Down
12 changes: 6 additions & 6 deletions tests/test_pq.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ __CUT_BRINGUP__pq()
/* When CUT 3.0 comes out it will fix this design flaw. For now we will
* just leak some queues during test. */
/*q = make_pq(2, job_pri_cmp);*/
j1 = make_job(1, 0, 0);
j2 = make_job(2, 0, 0);
j3a = make_job(3, 0, 0);
j3b = make_job(3, 0, 0);
j3c = make_job(3, 0, 0);
j1 = make_job(1, 0, 1, 0);
j2 = make_job(2, 0, 1, 0);
j3a = make_job(3, 0, 1, 0);
j3b = make_job(3, 0, 1, 0);
j3c = make_job(3, 0, 1, 0);
/*ASSERT(!!q, "Allocation should work");*/
ASSERT(!!j1, "Allocation should work");
ASSERT(!!j2, "Allocation should work");
Expand Down Expand Up @@ -122,7 +122,7 @@ __CUT__pq_test_many_jobs()
q = make_pq(HOW_MANY, job_pri_cmp);

for (i = 0; i < HOW_MANY; i++) {
j = make_job(1 + rand() % 8192, 0, 0);
j = make_job(1 + rand() % 8192, 0, 1, 0);
ASSERT(!!j, "allocation");
r = pq_give(q, j);
ASSERT(r, "insert should succeed");
Expand Down

0 comments on commit f568f77

Please sign in to comment.