From f568f7760495ed201052116517c165dbc1a091bf Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Mon, 17 Dec 2007 22:23:57 -0800 Subject: [PATCH] Per-job timeouts. (#1310) --- doc/protocol.txt | 10 ++++++++-- job.c | 3 ++- job.h | 4 +++- prot.c | 20 ++++++++++++++++---- prot.h | 4 +--- reserve.c | 2 +- tests/test_job.c | 10 +++++----- tests/test_pq.c | 12 ++++++------ 8 files changed, 42 insertions(+), 23 deletions(-) diff --git a/doc/protocol.txt b/doc/protocol.txt index 89dc4759..14d37ed3 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -78,7 +78,7 @@ Producer Command The "put" command is for any process that wants to insert a job into the queue. It comprises a command line followed by the job body: -put \r\n +put \r\n \r\n It inserts a job into the queue. @@ -89,6 +89,11 @@ It inserts a job into the queue. - is an integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time. + + - -- time to run -- is an integer number of seconds to allow a worker + to run this job. This time is counted from the moment a worker reserves + this job. If the worker does not delete, release, or bury the job within + seconds, the job will time out and the server will release the job. - is an integer (currently must be < 2**16) indicating the size of the job body, not including the trailing "\r\n". @@ -115,8 +120,9 @@ reserve\r\n This will return a newly-reserved job. If no job is available to be reserved, beanstalkd will wait to send a response until one becomes available. Once a job -is reserved for the client, the client has 120 seconds to run the job before +is reserved for the client, the client has limited time to run the job before the job times out, when the server will put the job back into the ready queue. +The time available can be found by asking the server for the job's stats. There is only one possible response in the form of a text line followed by the job body: diff --git a/job.c b/job.c index e1d080ac..7bc04214 100644 --- a/job.c +++ b/job.c @@ -42,7 +42,7 @@ allocate_job(int body_size) } job -make_job(unsigned int pri, unsigned int delay, int body_size) +make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size) { job j; @@ -52,6 +52,7 @@ make_job(unsigned int pri, unsigned int delay, int body_size) j->id = next_id++; j->pri = pri; j->delay = delay; + j->ttr = ttr; return j; } diff --git a/job.h b/job.h index 5d388553..7664ef4a 100644 --- a/job.h +++ b/job.h @@ -34,6 +34,7 @@ struct job { unsigned long long int id; unsigned int pri; unsigned int delay; + unsigned int ttr; int body_size; time_t creation; time_t deadline; @@ -46,7 +47,8 @@ struct job { }; job allocate_job(int body_size); -job make_job(unsigned int pri, unsigned int delay, int body_size); +job make_job(unsigned int pri, unsigned int delay, unsigned int ttr, + int body_size); typedef int(*job_cmp_fn)(job, job); int job_pri_cmp(job a, job b); diff --git a/prot.c b/prot.c index ee95ae54..d5e47282 100644 --- a/prot.c +++ b/prot.c @@ -491,6 +491,14 @@ read_delay(unsigned int *delay, const char *buf, char **end) return read_pri(delay, buf, end); } +/* Read a timeout value from the given buffer and place it in ttr. + * The interface and behavior are the same as in read_pri(). */ +static int +read_ttr(unsigned int *ttr, const char *buf, char **end) +{ + return read_pri(ttr, buf, end); +} + static void wait_for_job(conn c) { @@ -540,6 +548,7 @@ fmt_job_stats(char *buf, size_t size, void *jp) job_state(j), (unsigned int) (t - j->creation), j->delay, + j->ttr, (unsigned int) (j->deadline - t), j->timeout_ct, j->release_ct, @@ -566,8 +575,8 @@ dispatch_cmd(conn c) unsigned int count; job j; char type; - char *size_buf, *delay_buf, *pri_buf, *end_buf; - unsigned int pri, delay, body_size; + char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf; + unsigned int pri, delay, ttr, body_size; unsigned long long int id; /* NUL-terminate this string so we can use strtol and friends */ @@ -586,7 +595,10 @@ dispatch_cmd(conn c) r = read_pri(&pri, c->cmd + 4, &delay_buf); if (r) return conn_close(c); - r = read_delay(&delay, delay_buf, &size_buf); + r = read_delay(&delay, delay_buf, &ttr_buf); + if (r) return conn_close(c); + + r = read_ttr(&ttr, ttr_buf, &size_buf); if (r) return conn_close(c); errno = 0; @@ -600,7 +612,7 @@ dispatch_cmd(conn c) conn_set_producer(c); - c->in_job = make_job(pri, delay, body_size + 2); + c->in_job = make_job(pri, delay, ttr, body_size + 2); fill_extra_data(c); diff --git a/prot.h b/prot.h index 72a231bc..669c318c 100644 --- a/prot.h +++ b/prot.h @@ -29,9 +29,6 @@ #define URGENT_THRESHOLD 1024 -/* measured in seconds */ -#define RESERVATION_TIMEOUT 120 - #define MSG_RESERVED "RESERVED" #define CMD_PUT "put " @@ -101,6 +98,7 @@ "state: %s\n" \ "age: %u\n" \ "delay: %u\n" \ + "ttr: %u\n" \ "time-left: %u\n" \ "timeouts: %u\n" \ "releases: %u\n" \ diff --git a/reserve.c b/reserve.c index bd9ff22c..84da0fb2 100644 --- a/reserve.c +++ b/reserve.c @@ -28,7 +28,7 @@ static struct conn running = { &running, &running, 0 }; void reserve_job(conn c, job j) { - j->deadline = time(NULL) + RESERVATION_TIMEOUT; + j->deadline = time(NULL) + j->ttr; cur_reserved_ct++; /* stats */ conn_insert(&running, c); j->state = JOB_STATE_RESERVED; diff --git a/tests/test_job.c b/tests/test_job.c index 21c49761..6454e282 100644 --- a/tests/test_job.c +++ b/tests/test_job.c @@ -14,7 +14,7 @@ __CUT__job_test_creation() { job j; - j = make_job(1, 0, 0); + j = make_job(1, 0, 1, 0); ASSERT(j->pri == 1, "priority should match"); } @@ -23,8 +23,8 @@ __CUT__job_test_cmp_pris() { job a, b; - a = make_job(1, 0, 0); - b = make_job(1 << 27, 0, 0); + a = make_job(1, 0, 1, 0); + b = make_job(1 << 27, 0, 1, 0); ASSERT(job_pri_cmp(a, b) < 0, "should be a < b"); } @@ -34,8 +34,8 @@ __CUT__job_test_cmp_ids() { job a, b; - a = make_job(1, 0, 0); - b = make_job(1, 0, 0); + a = make_job(1, 0, 1, 0); + b = make_job(1, 0, 1, 0); b->id <<= 49; ASSERT(job_pri_cmp(a, b) < 0, "should be a < b"); diff --git a/tests/test_pq.c b/tests/test_pq.c index 7fcd0c10..1129e60a 100644 --- a/tests/test_pq.c +++ b/tests/test_pq.c @@ -13,11 +13,11 @@ __CUT_BRINGUP__pq() /* When CUT 3.0 comes out it will fix this design flaw. For now we will * just leak some queues during test. */ /*q = make_pq(2, job_pri_cmp);*/ - j1 = make_job(1, 0, 0); - j2 = make_job(2, 0, 0); - j3a = make_job(3, 0, 0); - j3b = make_job(3, 0, 0); - j3c = make_job(3, 0, 0); + j1 = make_job(1, 0, 1, 0); + j2 = make_job(2, 0, 1, 0); + j3a = make_job(3, 0, 1, 0); + j3b = make_job(3, 0, 1, 0); + j3c = make_job(3, 0, 1, 0); /*ASSERT(!!q, "Allocation should work");*/ ASSERT(!!j1, "Allocation should work"); ASSERT(!!j2, "Allocation should work"); @@ -122,7 +122,7 @@ __CUT__pq_test_many_jobs() q = make_pq(HOW_MANY, job_pri_cmp); for (i = 0; i < HOW_MANY; i++) { - j = make_job(1 + rand() % 8192, 0, 0); + j = make_job(1 + rand() % 8192, 0, 1, 0); ASSERT(!!j, "allocation"); r = pq_give(q, j); ASSERT(r, "insert should succeed");