Skip to content

Commit

Permalink
Reserve timeout, part 2.
Browse files Browse the repository at this point in the history
If the safety margin arrives while the client is waiting on a reserve
command, return the timeout response.
  • Loading branch information
Keith Rarick committed Mar 11, 2008
1 parent dc266b8 commit d53c996
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
12 changes: 8 additions & 4 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,16 @@ has_reserved_job(conn c)
int
conn_set_evq(conn c, const int events, evh handler)
{
int r;
int r, margin = 0;
struct timeval tv = {0, 0};

event_set(&c->evq, c->fd, events, handler, c);

if (has_reserved_job(c)) tv.tv_sec = soonest_job(c)->deadline - time(NULL);
if (conn_waiting(c)) margin = 1;
if (has_reserved_job(c)) {
time_t t = soonest_job(c)->deadline - time(NULL) - margin;
tv.tv_sec = t > 0 ? t : 0;
}

r = event_add(&c->evq, has_reserved_job(c) ? &tv : NULL);
if (r == -1) return twarn("event_add() err %d", errno), -1;
Expand All @@ -164,12 +168,12 @@ conn_update_evq(conn c, const int events)
{
int r;

if (!c) return -1;
if (!c) return twarnx("c is NULL"), -1;

/* If it's been added, try to delete it first */
if (c->evq.ev_base) {
r = event_del(&c->evq);
if (r == -1) return -1;
if (r == -1) return twarn("event_del() err %d", errno), -1;
}

return conn_set_evq(c, events, c->evq.ev_callback);
Expand Down
2 changes: 2 additions & 0 deletions conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
#define CONN_TYPE_WORKER 2
#define CONN_TYPE_WAITING 4

#define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)

typedef struct conn *conn;

struct conn {
Expand Down
16 changes: 10 additions & 6 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,19 @@ is reserved for the client, the client has limited time to run the job before
the job times out, when the server will put the job back into the ready queue.
The time available can be found by asking the server for the job's stats.

If the client has previously reserved job J and issues a reserve command when
J has less than one second left in its ttr interval and there are no ready
jobs, the server will respond with:
During the TTR of a reserved job, the last second is reserved by the server as
a safety margin during which the client will not be made to wait for another
job. If the client issues a reserve command during this time, or if the safety
mmargin arrives while the client is waiting on a reserve command, the server
will respond with:

TIMEOUT\r\n

This gives the client a chance to delete or release job J before the server
automatically releases it. Otherwise, the only response is in the form of a
text line followed by the job body:
This gives the client a chance to delete or release its reserved job before
the server automatically releases it.

Otherwise, the only response to this command is a successful reservation in
the form of a text line followed by the job body:

RESERVED <id> <bytes>\r\n
<data>\r\n
Expand Down
22 changes: 16 additions & 6 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ reply(conn c, const char *line, int len, int state)
{
int r;

if (!c) return;

r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);

Expand Down Expand Up @@ -273,7 +275,8 @@ remove_waiting_conn(conn c)
tube t;
size_t i;

if (!(c->type & CONN_TYPE_WAITING)) return NULL;
if (!conn_waiting(c)) return NULL;

c->type &= ~CONN_TYPE_WAITING;
global_stat.waiting_ct--;
for (i = 0; i < c->watch.used; i++) {
Expand Down Expand Up @@ -802,12 +805,12 @@ wait_for_job(conn c)
{
int r;

c->state = STATE_WAIT;
enqueue_waiting_conn(c);

/* this conn is waiting, but we want to know if they hang up */
r = conn_update_evq(c, EV_READ | EV_PERSIST);
if (r == -1) return twarnx("update events failed"), conn_close(c);

c->state = STATE_WAIT;
enqueue_waiting_conn(c);
}

typedef int(*fmt_fn)(char *, size_t, void *);
Expand Down Expand Up @@ -1273,18 +1276,25 @@ dispatch_cmd(conn c)
static void
h_conn_timeout(conn c)
{
int r;
int r, should_timeout = 0;
job j;

if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;

while ((j = soonest_job(c))) {
if (j->deadline > time(NULL)) return;
if (j->deadline > time(NULL)) break;
timeout_ct++; /* stats */
j->timeout_ct++;
r = enqueue_job(remove_this_reserved_job(c, j), 0);
if (!r) bury_job(j); /* there was no room in the queue, so bury it */
r = conn_update_evq(c, c->evq.ev_events);
if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
}

if (should_timeout) {
dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
return reply_msg(remove_waiting_conn(c), MSG_TIMEOUT);
}
}

void
Expand Down

0 comments on commit d53c996

Please sign in to comment.