From 6e9339ff9c177cc37787ee252fd3cf3a04f17243 Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Wed, 30 Sep 2009 21:48:30 -0700 Subject: [PATCH] Always use hi-res timers internally. This helps us to avoid off-by-one and roundoff errors that crop up with one-second-granularity timers. Also makes it easier in the future to allow clients to specify hi-res time intervals. Closes gh-5. --- binlog.c | 6 +-- conn.c | 18 +++---- conn.h | 1 - job.c | 8 ++-- job.h | 12 ++--- net.c | 13 +++-- net.h | 4 +- prot.c | 76 +++++++++++++++++------------- sh-tests/binlog-diskfull-delete.sh | 15 ++---- sh-tests/binlog-diskfull.sh | 9 +--- sh-tests/no_large_delays.commands | 2 - sh-tests/no_large_delays.expected | 1 - 12 files changed, 81 insertions(+), 84 deletions(-) delete mode 100644 sh-tests/no_large_delays.commands delete mode 100644 sh-tests/no_large_delays.expected diff --git a/binlog.c b/binlog.c index 544c17b7..7f8f99bf 100644 --- a/binlog.c +++ b/binlog.c @@ -63,7 +63,7 @@ uint64_t last_fsync = 0; char *binlog_dir = NULL; static int binlog_index = 0; -static int binlog_version = 4; +static int binlog_version = 5; static int lock_fd; static binlog oldest_binlog = 0, @@ -208,7 +208,7 @@ binlog_read_log_file(binlog b, job binlog_jobs) j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size, t, js.id); j->next = j->prev = j; - j->creation = js.creation; + j->created_at = js.created_at; job_insert(binlog_jobs, j); } if (js.body_size) { @@ -234,7 +234,7 @@ binlog_read_log_file(binlog b, job binlog_jobs) } if (j) { j->state = js.state; - j->deadline = js.deadline; + j->deadline_at = js.deadline_at; j->pri = js.pri; j->delay = js.delay; j->ttr = js.ttr; diff --git a/conn.c b/conn.c index a6d3fa6c..32f5f169 100644 --- a/conn.c +++ b/conn.c @@ -18,7 +18,6 @@ #include #include -#include #include #include @@ -27,7 +26,7 @@ #include "util.h" #include "prot.h" -#define SAFETY_MARGIN 1 /* seconds */ +#define SAFETY_MARGIN (1 * SECOND) /* Doubly-linked list of free connections. */ static struct conn pool = { &pool, &pool, 0 }; @@ -151,19 +150,20 @@ conn_set_evq(conn c, const int events, evh handler) { int r, margin = 0, should_timeout = 0; struct timeval tv = {INT_MAX, 0}; + usec t = UINT64_MAX; event_set(&c->evq, c->fd, events, handler, c); - if (conn_waiting(c)) margin = 1; + if (conn_waiting(c)) margin = SAFETY_MARGIN; if (has_reserved_job(c)) { - time_t t = soonest_job(c)->deadline - time(NULL) - margin; - tv.tv_sec = t > 0 ? t : 0; + t = soonest_job(c)->deadline_at - now_usec() - margin; should_timeout = 1; } if (c->pending_timeout >= 0) { - tv.tv_sec = min(tv.tv_sec, c->pending_timeout); + t = min(t, c->pending_timeout * SECOND); should_timeout = 1; } + if (should_timeout) timeval_from_usec(&tv, t); r = event_add(&c->evq, should_timeout ? &tv : NULL); if (r == -1) return twarn("event_add() err %d", errno), -1; @@ -226,7 +226,7 @@ soonest_job(conn c) if (soonest == NULL) { for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) { - if (j->deadline <= (soonest ? : j)->deadline) soonest = j; + if (j->deadline_at <= (soonest ? : j)->deadline_at) soonest = j; } } c->soonest_job = soonest; @@ -244,10 +244,10 @@ has_reserved_this_job(conn c, job j) int conn_has_close_deadline(conn c) { - time_t t = time(NULL); + usec t = now_usec(); job j = soonest_job(c); - return j && t >= j->deadline - SAFETY_MARGIN; + return j && t >= j->deadline_at - SAFETY_MARGIN; } int diff --git a/conn.h b/conn.h index 64d135c3..9ce6b1c6 100644 --- a/conn.h +++ b/conn.h @@ -20,7 +20,6 @@ #define conn_h #include -#include #include #include "ms.h" #include "tube.h" diff --git a/job.c b/job.c index b2de2b9b..c4472be7 100644 --- a/job.c +++ b/job.c @@ -110,7 +110,7 @@ allocate_job(int body_size) j->id = 0; j->state = JOB_STATE_INVALID; - j->creation = time(NULL); + j->created_at = now_usec(); j->reserve_ct = j->timeout_ct = j->release_ct = j->bury_ct = j->kick_ct = 0; j->body_size = body_size; j->next = j->prev = j; /* not in a linked list */ @@ -124,7 +124,7 @@ allocate_job(int body_size) } job -make_job_with_id(unsigned int pri, unsigned int delay, unsigned int ttr, +make_job_with_id(unsigned int pri, usec delay, usec ttr, int body_size, tube tube, uint64_t id) { job j; @@ -188,8 +188,8 @@ job_pri_cmp(job a, job b) int job_delay_cmp(job a, job b) { - if (a->deadline > b->deadline) return 1; - if (a->deadline < b->deadline) return -1; + if (a->deadline_at > b->deadline_at) return 1; + if (a->deadline_at < b->deadline_at) return -1; if (a->id > b->id) return 1; if (a->id < b->id) return -1; return 0; diff --git a/job.h b/job.h index b701721a..ae2bc27f 100644 --- a/job.h +++ b/job.h @@ -25,7 +25,7 @@ # include #endif /* else we get int types from config.h */ -#include +#include "util.h" typedef struct job *job; typedef int(*job_cmp_fn)(job, job); @@ -46,11 +46,11 @@ struct job { /* persistent fields; these get written to the binlog */ uint64_t id; uint32_t pri; - uint32_t delay; - uint32_t ttr; + usec delay; + usec ttr; int32_t body_size; - time_t creation; - time_t deadline; + usec created_at; + usec deadline_at; uint32_t reserve_ct; uint32_t timeout_ct; uint32_t release_ct; @@ -75,7 +75,7 @@ struct job { #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0) job allocate_job(int body_size); -job make_job_with_id(unsigned int pri, unsigned int delay, unsigned int ttr, +job make_job_with_id(unsigned int pri, usec delay, usec ttr, int body_size, tube tube, uint64_t id); void job_free(job j); diff --git a/net.c b/net.c index 4da58449..6720c885 100644 --- a/net.c +++ b/net.c @@ -25,7 +25,7 @@ static int listen_socket = -1; static struct event listen_evq; static evh accept_handler; -static time_t main_deadline = 0; +static usec main_deadline = 0; static int brakes_are_on = 1, after_startup = 0; int @@ -99,12 +99,15 @@ unbrake(evh h) } void -set_main_timeout(time_t deadline) +set_main_timeout(usec deadline_at) { int r; - struct timeval tv = {deadline - time(NULL), 0}; + struct timeval tv; + usec now = now_usec(); - main_deadline = deadline; - r = event_add(&listen_evq, deadline ? &tv : NULL); + timeval_from_usec(&tv, deadline_at - now); + + main_deadline = deadline_at; + r = event_add(&listen_evq, deadline_at ? &tv : NULL); if (r == -1) twarn("event_add()"); } diff --git a/net.h b/net.h index f2df8337..38a4a7a5 100644 --- a/net.h +++ b/net.h @@ -17,7 +17,6 @@ #ifndef net_h #define net_h -#include #include #include #include @@ -26,11 +25,12 @@ #include "event.h" #include "conn.h" +#include "util.h" int make_server_socket(struct in_addr host_addr, int port); void brake(); void unbrake(evh h); -void set_main_timeout(time_t deadline); +void set_main_timeout(usec deadline_at); #endif /*net_h*/ diff --git a/prot.c b/prot.c index b9453ddb..84960a18 100644 --- a/prot.c +++ b/prot.c @@ -214,10 +214,10 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT; "tube: %s\n" \ "state: %s\n" \ "pri: %u\n" \ - "age: %u\n" \ - "delay: %u\n" \ - "ttr: %u\n" \ - "time-left: %u\n" \ + "age: %llu\n" \ + "delay: %llu\n" \ + "ttr: %llu\n" \ + "time-left: %llu\n" \ "reserves: %u\n" \ "timeouts: %u\n" \ "releases: %u\n" \ @@ -236,7 +236,7 @@ static struct stats global_stat = {0, 0, 0, 0, 0}; static tube default_tube; static int drain_mode = 0; -static time_t start_time; +static usec started_at; static uint64_t op_ct[TOTAL_OPS], timeout_ct = 0; @@ -348,7 +348,7 @@ remove_waiting_conn(conn c) static void reserve_job(conn c, job j) { - j->deadline = time(NULL) + j->ttr; + j->deadline_at = now_usec() + j->ttr; global_stat.reserved_ct++; /* stats */ j->tube->stat.reserved_ct++; j->reserve_ct++; @@ -356,7 +356,7 @@ reserve_job(conn c, job j) j->state = JOB_STATE_RESERVED; job_insert(&c->reserved_jobs, j); j->reserver = c; - if (c->soonest_job && j->deadline < c->soonest_job->deadline) { + if (c->soonest_job && j->deadline_at < c->soonest_job->deadline_at) { c->soonest_job = j; } return reply_job(c, j, MSG_RESERVED); @@ -413,7 +413,7 @@ delay_q_peek() t = tubes.items[i]; nj = pq_peek(&t->delay); if (!nj) continue; - if (!j || nj->deadline < j->deadline) j = nj; + if (!j || nj->deadline_at < j->deadline_at) j = nj; } return j; @@ -424,17 +424,17 @@ set_main_delay_timeout() { job j; - set_main_timeout((j = delay_q_peek()) ? j->deadline : 0); + set_main_timeout((j = delay_q_peek()) ? j->deadline_at : 0); } static int -enqueue_job(job j, unsigned int delay, char update_store) +enqueue_job(job j, usec delay, char update_store) { int r; j->reserver = NULL; if (delay) { - j->deadline = time(NULL) + delay; + j->deadline_at = now_usec() + delay; r = pq_give(&j->tube->delay, j); if (!r) return 0; j->state = JOB_STATE_DELAYED; @@ -648,7 +648,7 @@ touch_job(conn c, job j) { j = find_reserved_job_in_conn(c, j); if (j) { - j->deadline = time(NULL) + j->ttr; + j->deadline_at = now_usec() + j->ttr; c->soonest_job = NULL; } return j; @@ -797,7 +797,7 @@ enqueue_incoming_job(conn c) static unsigned int uptime() { - return time(NULL) - start_time; + return (now_usec() - started_at) / 1000000; } static int @@ -881,20 +881,25 @@ read_pri(unsigned int *pri, const char *buf, char **end) } /* Read a delay value from the given buffer and place it in delay. - * The interface and behavior are the same as in read_pri(). */ + * The interface and behavior are analogous to read_pri(). */ static int -read_delay(unsigned int *delay, const char *buf, char **end) +read_delay(usec *delay, const char *buf, char **end) { - time_t now = time(NULL); - return read_pri(delay, buf, end) ? : (now + *delay < now); + int r; + unsigned int delay_sec; + + r = read_pri(&delay_sec, buf, end); + if (r) return r; + *delay = delay_sec * 1000000; + return 0; } /* Read a timeout value from the given buffer and place it in ttr. - * The interface and behavior are the same as in read_pri(). */ + * The interface and behavior are the same as in read_delay(). */ static int -read_ttr(unsigned int *ttr, const char *buf, char **end) +read_ttr(usec *ttr, const char *buf, char **end) { - return read_pri(ttr, buf, end); + return read_delay(ttr, buf, end); } static void @@ -970,18 +975,18 @@ do_list_tubes(conn c, ms l) static int fmt_job_stats(char *buf, size_t size, job j) { - time_t t; + usec t; - t = time(NULL); + t = now_usec(); return snprintf(buf, size, JOB_STATS_FMT, j->id, j->tube->name, job_state(j), j->pri, - (unsigned int) (t - j->creation), - j->delay, - j->ttr, - (unsigned int) (j->deadline - t), + (t - j->created_at) / 1000000, + j->delay / 1000000, + j->ttr / 1000000, + (j->deadline_at - t) / 1000000, j->reserve_ct, j->timeout_ct, j->release_ct, @@ -1061,7 +1066,8 @@ dispatch_cmd(conn c) job j; unsigned char type; char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name; - unsigned int pri, delay, ttr, body_size; + unsigned int pri, body_size; + usec delay, ttr; uint64_t id; tube t = NULL; @@ -1439,7 +1445,7 @@ h_conn_timeout(conn c) /* Check if any reserved jobs have run out of time. We should do this * whether or not the client is waiting for a new reservation. */ while ((j = soonest_job(c))) { - if (j->deadline >= time(NULL)) break; + if (j->deadline_at >= now_usec()) break; /* This job is in the middle of being written out. If we return it to * the ready queue, someone might free it before we finish writing it @@ -1636,11 +1642,11 @@ h_delay() { int r; job j; - time_t t; + usec t; - t = time(NULL); + t = now_usec(); while ((j = delay_q_peek())) { - if (j->deadline > t) break; + if (j->deadline_at > t) break; j = delay_q_take(); r = enqueue_job(j, 0, 0); if (r < 1) bury_job(j, 0); /* out of memory, so bury it */ @@ -1684,7 +1690,7 @@ h_accept(const int fd, const short which, struct event *ev) void prot_init() { - start_time = time(NULL); + started_at = now_usec(); memset(op_ct, 0, sizeof(op_ct)); ms_init(&tubes, NULL, NULL); @@ -1697,7 +1703,7 @@ void prot_replay_binlog(job binlog_jobs) { job j, nj; - unsigned int delay; + usec delay; int r; for (j = binlog_jobs->next ; j != binlog_jobs ; j = nj) { @@ -1710,7 +1716,9 @@ prot_replay_binlog(job binlog_jobs) bury_job(j, 0); break; case JOB_STATE_DELAYED: - if (start_time < j->deadline) delay = j->deadline - start_time; + if (started_at < j->deadline_at) { + delay = j->deadline_at - started_at; + } /* fall through */ default: r = enqueue_job(j, delay, 0); diff --git a/sh-tests/binlog-diskfull-delete.sh b/sh-tests/binlog-diskfull-delete.sh index 097ab854..dcdc87b3 100755 --- a/sh-tests/binlog-diskfull-delete.sh +++ b/sh-tests/binlog-diskfull-delete.sh @@ -74,8 +74,6 @@ put 0 0 100 50 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx put 0 0 100 50 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx -put 0 0 100 50 -xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx EOF diff - "$out1" </dev/null; then fi $nc $server $port < "$out2" -delete 7 delete 8 -delete 9 +delete 11 delete 12 delete 13 delete 14 -delete 15 EOF diff - "$out2" <