Skip to content

Commit

Permalink
Finish #852 - add a delay feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Nov 8, 2007
1 parent 2ae48bc commit 024fe3c
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 5 deletions.
20 changes: 20 additions & 0 deletions beanstalkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,24 @@ h_conn(const int fd, const short which, conn c)
while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
}

static void
h_delay()
{
int r;
job j;
time_t t;

t = time(NULL);
while ((j = delay_q_peek())) {
if (j->deadline > t) break;
j = delay_q_take();
r = enqueue_job(j, 0);
if (!r) bury_job(j); /* there was no room in the queue, so bury it */
}

set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
}

static void
h_accept(const int fd, const short which, struct event *ev)
{
Expand All @@ -696,6 +714,8 @@ h_accept(const int fd, const short which, struct event *ev)
socklen_t addrlen;
struct sockaddr addr;

if (which == EV_TIMEOUT) return h_delay();

addrlen = sizeof addr;
cfd = accept(fd, &addr, &addrlen);
if (cfd == -1) {
Expand Down
4 changes: 2 additions & 2 deletions job.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ job_pri_cmp(job a, job b)
int
job_delay_cmp(job a, job b)
{
if (a->delay == b->delay) {
if (a->deadline == b->deadline) {
/* we can't just subtract because id has too many bits */
if (a->id > b->id) return 1;
if (a->id < b->id) return -1;
return 0;
}
return a->delay - b->delay;
return a->deadline - b->deadline;
}

job
Expand Down
20 changes: 17 additions & 3 deletions net.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
static int listen_socket = -1;
static struct event listen_evq;
static evh accept_handler;
static time_t main_deadline = 0;
static int brakes_are_on = 1;

int
make_server_socket(int host, int port)
Expand Down Expand Up @@ -51,6 +53,8 @@ brake()
{
int r;

if (brakes_are_on) return;
brakes_are_on = 1;
twarnx("too many connections; putting on the brakes");

r = event_del(&listen_evq);
Expand All @@ -65,17 +69,27 @@ unbrake(evh h)
{
int r;

if (!brakes_are_on) return;
brakes_are_on = 0;
twarnx("releasing the brakes");

accept_handler = h ? : accept_handler;
event_set(&listen_evq, listen_socket, EV_READ | EV_PERSIST,
accept_handler, &listen_evq);

errno = 0;
r = event_add(&listen_evq, NULL);
if (r == -1) twarn("event_add()");
set_main_timeout(main_deadline);

r = listen(listen_socket, 1024);
if (r == -1) twarn("listen()");
}

void
set_main_timeout(time_t deadline)
{
int r;
struct timeval tv = {deadline - time(NULL), 0};

main_deadline = deadline;
r = event_add(&listen_evq, deadline ? &tv : NULL);
if (r == -1) twarn("event_add()");
}
1 change: 1 addition & 0 deletions net.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ int make_server_socket(int host, int port);

void brake();
void unbrake(evh h);
void set_main_timeout(time_t deadline);

#endif /*net_h*/
7 changes: 7 additions & 0 deletions pq.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ pq_take(pq q)
return j;
}

job
pq_peek(pq q)
{
if (q->used == 0) return NULL;
return q->heap[0];
}

job
pq_find(pq q, unsigned long long int id)
{
Expand Down
3 changes: 3 additions & 0 deletions pq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ int pq_give(pq q, job j);
/* return a job if the queue contains jobs, else NULL */
job pq_take(pq q);

/* return a job if the queue contains jobs, else NULL */
job pq_peek(pq q);

/* return a job that matches the given id, else NULL */
/* This is O(n), so don't do it much. */
job pq_find(pq q, unsigned long long int id);
Expand Down
16 changes: 16 additions & 0 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "conn.h"
#include "util.h"
#include "reserve.h"
#include "net.h"

static pq ready_q;
static pq delay_q;
Expand Down Expand Up @@ -90,9 +91,11 @@ enqueue_job(job j, unsigned int delay)
int r;

if (delay) {
j->deadline = time(NULL) + delay;
r = pq_give(delay_q, j);
if (!r) return 0;
j->state = JOB_STATE_DELAY;
set_main_timeout(pq_peek(delay_q)->deadline);
} else {
r = pq_give(ready_q, j);
if (!r) return 0;
Expand All @@ -103,6 +106,18 @@ enqueue_job(job j, unsigned int delay)
return 1;
}

job
delay_q_peek()
{
return pq_peek(delay_q);
}

job
delay_q_take()
{
return pq_take(delay_q);
}

void
bury_job(job j)
{
Expand Down Expand Up @@ -157,6 +172,7 @@ job
peek_job(unsigned long long int id)
{
return pq_find(ready_q, id) ? :
pq_find(delay_q, id) ? :
find_reserved_job(id) ? :
find_reserved_job_in_list(&wait_queue, id) ? :
find_buried_job(id);
Expand Down
2 changes: 2 additions & 0 deletions prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ conn remove_waiting_conn(conn c);
void enqueue_waiting_conn(conn c);

int enqueue_job(job j, unsigned int delay);
job delay_q_peek();
job delay_q_take();
void bury_job(job j);
int kick_job();
void process_queue();
Expand Down

0 comments on commit 024fe3c

Please sign in to comment.