Skip to content

Commit

Permalink
Refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Feb 21, 2008
1 parent 6026eb8 commit d90f71b
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 196 deletions.
31 changes: 30 additions & 1 deletion conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "net.h"
#include "util.h"
#include "prot.h"
#include "reserve.h"

/* Doubly-linked list of free connections. */
static struct conn pool = { &pool, &pool, 0 };
Expand Down Expand Up @@ -112,6 +111,12 @@ count_cur_workers()
return cur_worker_ct;
}

static int
has_reserved_job(conn c)
{
return job_list_any_p(&c->reserved_jobs);
}

int
conn_set_evq(conn c, const int events, evh handler)
{
Expand Down Expand Up @@ -173,6 +178,30 @@ conn_insert(conn head, conn c)
head->prev = c;
}

/* return the reserved job with the earliest deadline,
* or NULL if there's no reserved job */
job
soonest_job(conn c)
{
job j, soonest = NULL;

for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (j->deadline <= (soonest ? : j)->deadline) soonest = j;
}
return soonest;
}

int
has_reserved_this_job(conn c, job needle)
{
job j;

for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (needle == j) return 1;
}
return 0;
}

void
conn_close(conn c)
{
Expand Down
5 changes: 3 additions & 2 deletions conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ struct conn {
struct job reserved_jobs; /* doubly-linked list header */
};

void conn_init();

conn make_conn(int fd, char start_state);

int conn_set_evq(conn c, const int events, evh handler);
Expand All @@ -97,4 +95,7 @@ int count_cur_workers();
void conn_set_producer(conn c);
void conn_set_worker(conn c);

job soonest_job(conn c);
int has_reserved_this_job(conn c, job j);

#endif /*conn_h*/
106 changes: 93 additions & 13 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "job.h"
#include "conn.h"
#include "util.h"
#include "reserve.h"
#include "net.h"
#include "version.h"

Expand All @@ -48,6 +47,8 @@
#define CMD_STATS "stats"
#define CMD_JOBSTATS "stats "

#define CONSTSTRLEN(m) (sizeof(m) - 1)

#define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
#define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
#define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
Expand All @@ -60,6 +61,7 @@

#define MSG_FOUND "FOUND"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
#define MSG_RESERVED "RESERVED"
#define MSG_DELETED "DELETED\r\n"
#define MSG_RELEASED "RELEASED\r\n"
#define MSG_BURIED "BURIED\r\n"
Expand Down Expand Up @@ -134,6 +136,12 @@ static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
stats_ct = 0, timeout_ct = 0;

static unsigned int cur_reserved_ct = 0;


/* Doubly-linked list of connections with at least one reserved job. */
static struct conn running = { &running, &running, 0 };

#ifdef DEBUG
static const char * op_names[] = {
"<unknown>",
Expand Down Expand Up @@ -198,7 +206,7 @@ reply_line(conn c, int state, const char *fmt, ...)
return reply(c, c->reply_buf, r, state);
}

void
static void
reply_job(conn c, job j, const char *word)
{
/* tell this connection which job to send */
Expand All @@ -218,6 +226,17 @@ remove_waiting_conn(conn c)
return conn_remove(c);
}

static void
reserve_job(conn c, job j)
{
j->deadline = time(NULL) + j->ttr;
cur_reserved_ct++; /* stats */
conn_insert(&running, c);
j->state = JOB_STATE_RESERVED;
job_insert(&c->reserved_jobs, j);
return reply_job(c, j, MSG_RESERVED);
}

static void
process_queue()
{
Expand All @@ -231,7 +250,7 @@ process_queue()
}
}

int
static int
enqueue_job(job j, unsigned int delay)
{
int r;
Expand All @@ -252,6 +271,30 @@ enqueue_job(job j, unsigned int delay)
return 1;
}

static void
bury_job(job j)
{
job_insert(&graveyard, j);
buried_ct++;
j->state = JOB_STATE_BURIED;
j->bury_ct++;
}

void
enqueue_reserved_jobs(conn c)
{
int r;
job j;

while (job_list_any_p(&c->reserved_jobs)) {
j = job_remove(c->reserved_jobs.next);
r = enqueue_job(j, 0);
if (!r) bury_job(j);
cur_reserved_ct--;
if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
}
}

static job
delay_q_peek()
{
Expand All @@ -264,15 +307,6 @@ delay_q_take()
return pq_take(delay_q);
}

void
bury_job(job j)
{
job_insert(&graveyard, j);
buried_ct++;
j->state = JOB_STATE_BURIED;
j->bury_ct++;
}

static job
remove_this_buried_job(job j)
{
Expand Down Expand Up @@ -381,6 +415,36 @@ enqueue_waiting_conn(conn c)
conn_insert(&wait_queue, conn_remove(c) ? : c);
}

static job
find_reserved_job_in_conn(conn c, unsigned long long int id)
{
job j;

for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
if (j->id == id) return j;
}
return NULL;
}

static job
find_reserved_job_in_list(conn list, unsigned long long int id)
{
job j;
conn c;

for (c = list->next; c != list; c = c->next) {
j = find_reserved_job_in_conn(c, id);
if (j) return j;
}
return NULL;
}

static job
find_reserved_job(unsigned long long int id)
{
return find_reserved_job_in_list(&running, id);
}

static job
peek_job(unsigned long long int id)
{
Expand Down Expand Up @@ -535,7 +599,7 @@ fmt_stats(char *buf, size_t size, void *x)
return snprintf(buf, size, STATS_FMT,
get_urgent_job_ct(),
get_ready_job_ct(),
get_reserved_job_ct(),
cur_reserved_ct,
get_delayed_job_ct(),
get_buried_job_ct(),
put_ct,
Expand Down Expand Up @@ -669,6 +733,22 @@ maybe_enqueue_incoming_job(conn c)
c->state = STATE_WANTDATA;
}

/* j can be NULL */
static job
remove_this_reserved_job(conn c, job j)
{
j = job_remove(j);
if (j) cur_reserved_ct--;
if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
return j;
}

static job
remove_reserved_job(conn c, unsigned long long int id)
{
return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
}

static void
dispatch_cmd(conn c)
{
Expand Down
9 changes: 1 addition & 8 deletions prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,13 @@
#include "job.h"
#include "conn.h"

#define CONSTSTRLEN(m) (sizeof(m) - 1)

#define URGENT_THRESHOLD 1024

#define MSG_RESERVED "RESERVED"

void prot_init();

void reply_job(conn c, job j, const char *word);

conn remove_waiting_conn(conn c);

int enqueue_job(job j, unsigned int delay);
void bury_job(job j);
void enqueue_reserved_jobs(conn c);

void enter_drain_mode(int sig);
void h_accept(const int fd, const short which, struct event *ev);
Expand Down
Loading

0 comments on commit d90f71b

Please sign in to comment.