Skip to content

Commit

Permalink
Hook up the delete command and fix a bug.
Browse files Browse the repository at this point in the history
If we read in a complete command as the trailing bytes of another command, be
sure to execute it.

Preallocate connection structs and keep a list of them. That way it's easy to
tell if a given conn pointer is valid or has been freed.
  • Loading branch information
Keith Rarick committed Sep 25, 2007
1 parent 4de7982 commit 6ea4e18
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 23 deletions.
84 changes: 71 additions & 13 deletions beanstalkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ scan_line_end(const char *s, int size)
return 0;
}

static int
cmd_len(conn c)
{
return scan_line_end(c->cmd, c->cmd_read);
}

/* parse the command line */
static int
which_cmd(conn c)
Expand All @@ -87,12 +93,16 @@ which_cmd(conn c)
}

/* Copy up to data_size trailing bytes into the job, then the rest into the cmd
* buffer. If c->in_job exists, this assumes that c->in_job->data is empty. */
* buffer. If c->in_job exists, this assumes that c->in_job->data is empty.
* This function is idempotent(). */
static void
fill_extra_data(conn c)
{
int extra_bytes, job_data_bytes = 0, cmd_bytes;

if (!c->fd) return; /* the connection was closed */
if (!c->cmd_len) return; /* we don't have a complete command */

/* how many extra bytes did we read? */
extra_bytes = c->cmd_read - c->cmd_len;
fprintf(stderr, "have %d extra bytes\n", extra_bytes);
Expand All @@ -109,6 +119,7 @@ fill_extra_data(conn c)
cmd_bytes = extra_bytes - job_data_bytes;
memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
c->cmd_read = cmd_bytes;
c->cmd_len = 0; /* we no longer know the length of the new command */
}

static void
Expand Down Expand Up @@ -145,12 +156,22 @@ maybe_enqueue_incoming_job(conn c)
c->state = STATE_WANTDATA;
}

static void
wait_for_job(conn c)
{
/* this conn is waiting, so we are not interested in reading or writing */
event_del(&c->evq);
c->state = STATE_WAIT;
enqueue_waiting_conn(c);
}

static void
do_cmd(conn c)
{
char type;
char *size_buf, *end_buf;
unsigned int pri, data_size;
unsigned long long int id;

/* NUL-terminate this string so we can use strtol and friends */
c->cmd[c->cmd_len - 2] = '\0';
Expand All @@ -159,6 +180,9 @@ do_cmd(conn c)
if (strlen(c->cmd) != c->cmd_len - 2) return conn_close(c);

type = which_cmd(c);

/* do not return inside this switch unless you close the connection */
/* likewise, be sure to return if you close the connection */
switch (type) {
case OP_PUT:
errno = 0;
Expand Down Expand Up @@ -193,52 +217,72 @@ do_cmd(conn c)

fprintf(stderr, "got reserve cmd: %s\n", c->cmd);

fill_extra_data(c);

warn("looking for a job");

/* keep any existing job, but take one if necessary */
//c->reserved_job = c->reserved_job ? : pq_take(ready_q);

//fprintf(stderr, "found job %p\n", c->reserved_job);

if (c->reserved_job) return reply_job(c, MSG_RESERVED);
/* does this conn already have a job reserved? */
if (c->reserved_job) {
reply_job(c, MSG_RESERVED);
break;
}

enqueue_waiting_conn(c);
/* try to get a new job for this guy */
wait_for_job(c);
process_queue();
break;
case OP_DELETE:
reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
errno = 0;
id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
if (errno) return conn_close(c);

if (!c->reserved_job || id != c->reserved_job->id) {
reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
break;
}

free(c->reserved_job);
c->reserved_job = NULL;

reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_WANTCOMMAND);
break;
case OP_STATS:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_STATS_LEN + 2) return conn_close(c);
warn("got stats command");
conn_close(c);
return conn_close(c);
break;
case OP_JOBSTATS:
warn("got job stats command");
conn_close(c);
return conn_close(c);
break;
default:
/* unknown command -- protocol error */
fprintf(stderr, "got unknown cmd: %s\n", c->cmd);
return conn_close(c);
}
fill_extra_data(c);
}

static void
reset_conn(conn c)
{
int r;

fprintf(stderr, "%d: resetting back to STATE_WANTCOMMAND\n", c->fd);

r = conn_update_evq(c, EV_READ | EV_PERSIST, NULL);
if (r == -1) return warn("update flags failed"), conn_close(c);

c->reply_sent = 0; /* now that we're done, reset this */
c->state = STATE_WANTCOMMAND;
}

#define cmd_is_ready(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))

static void
h_conn(const int fd, const short which, conn c)
{
Expand All @@ -254,6 +298,9 @@ h_conn(const int fd, const short which, conn c)

fprintf(stderr, "%d: got event %d\n", fd, which);

/* Do not return inside this switch unless you close the connection or got
* no data. Likewise, be sure to return if you might close the connection.
* */
switch (c->state) {
case STATE_WANTCOMMAND:
r = read(fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
Expand All @@ -262,13 +309,18 @@ h_conn(const int fd, const short which, conn c)

c->cmd_read += r; /* we got some bytes */

c->cmd_len = scan_line_end(c->cmd, c->cmd_read); /* find the EOL */
c->cmd_len = cmd_len(c); /* find the EOL */

/* yay, complete command line */
if (c->cmd_len) return do_cmd(c);
if (c->cmd_len) {
do_cmd(c);
break;
}

/* c->cmd_read > LINE_BUF_SIZE can't happen */

/* command line too long? */
if (c->cmd_read >= LINE_BUF_SIZE) return conn_close(c);
if (c->cmd_read == LINE_BUF_SIZE) return conn_close(c);

/* otherwise we have an incomplete line, so just keep waiting */
break;
Expand All @@ -295,7 +347,7 @@ h_conn(const int fd, const short which, conn c)

/* (c->reply_sent > c->reply_len) can't happen */

if (c->reply_sent == c->reply_len) return reset_conn(c);
if (c->reply_sent == c->reply_len) reset_conn(c);

/* otherwise we sent an incomplete reply, so just keep waiting */
break;
Expand All @@ -321,11 +373,16 @@ h_conn(const int fd, const short which, conn c)
/* (j->data_xfer > j->data_size) can't happen */

/* are we done? */
if (j->data_xfer == j->data_size) return reset_conn(c);
if (j->data_xfer == j->data_size) reset_conn(c);

/* otherwise we sent incomplete data, so just keep waiting */
break;
}

/* If we got here, it means we still have an open connection and we're
* ready to run a command. So we should check if we already got more
* commands to process. */
while (cmd_is_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
}

static void
Expand Down Expand Up @@ -368,6 +425,7 @@ main(int argc, char **argv)

drop_root();
prot_init();
conn_init();
daemonize();
event_init();
set_sig_handlers();
Expand Down
3 changes: 3 additions & 0 deletions beanstalkd.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
#define MSG_INSERTED "INSERTED\r\n"
#define MSG_NOT_INSERTED "NOT_INSERTED\r\n"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
#define MSG_DELETED "DELETED\r\n"

#define MSG_INSERTED_LEN CONSTSTRLEN(MSG_INSERTED)
#define MSG_NOT_INSERTED_LEN CONSTSTRLEN(MSG_NOT_INSERTED)
#define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
#define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)

#define CMD_PUT "put "
#define CMD_PEEK "peek "
Expand All @@ -24,6 +26,7 @@
#define CMD_JOBSTATS "stats "

#define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
#define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
#define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)

#endif /*beanstalk_h*/
54 changes: 51 additions & 3 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,61 @@
#include "util.h"
#include "prot.h"

static struct conn pool[MAX_CONNECTIONS];

/* linked list of free connections. See struct conn's next field in conn.h */
static conn pool_front, pool_rear;

static int
pool_conn_p()
{
return !!pool_front;
}

static conn
conn_alloc()
{
conn c;

if (!pool_conn_p()) return NULL;

/* remove it from the list */
c = pool_front;
pool_front = c->next;
c->next = NULL;

return c;
}

static void
conn_free(conn c)
{
c->fd = 0;
c->next = NULL;
if (pool_conn_p()) {
pool_rear->next = c;
} else {
pool_front = c;
}
pool_rear = c;
}

void
conn_init()
{
int i;

pool_front = pool_rear = NULL;
for (i = 0; i < MAX_CONNECTIONS; i++) conn_free(&pool[i]);
}

conn
make_conn(int fd, char start_state)
{
conn c;

c = malloc(sizeof(struct conn));
if (!c) return warn("OOM"), NULL;
c = conn_alloc();
if (!c) return NULL;

c->fd = fd;
c->state = start_state;
Expand Down Expand Up @@ -59,5 +107,5 @@ conn_close(conn c)
if (c->reserved_job) enqueue_job(c->reserved_job);
if (c->in_job) free(c->in_job);

free(c);
conn_free(c);
}
8 changes: 7 additions & 1 deletion conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
#include "event.h"
#include "job.h"

/* We will stop accepting new connections at 8 simultaneous Ki-connections. */
#define MAX_CONNECTIONS (8 * 1024)

#define STATE_WANTCOMMAND 0
#define STATE_WANTDATA 1
#define STATE_SENDJOB 2
#define STATE_SENDWORD 3
#define STATE_WAIT 4

/* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
* MUST be enough to hold the longest possible reply line, which is currently
Expand Down Expand Up @@ -45,9 +49,11 @@ struct conn {
char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */
job in_job;
job reserved_job;
conn next_waiting; /* linked list of connections waiting for a job */
conn next; /* linked list of connections */
};

void conn_init();

conn make_conn(int fd, char start_state);

int conn_update_evq(conn c, const int flags, evh handler);
Expand Down
20 changes: 14 additions & 6 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

static pq ready_q;

/* linked list of waiting connections. See struct conn's next field in conn.h */
static conn waiting_conn_front, waiting_conn_rear;

static int
Expand Down Expand Up @@ -65,9 +66,10 @@ next_waiting_conn()

if (!waiting_conn_p()) return NULL;

/* remove it from the list */
c = waiting_conn_front;
waiting_conn_front = c->next_waiting;
c->next_waiting = NULL; /* remove it from the list */
waiting_conn_front = c->next;
c->next = NULL;

return c;
}
Expand All @@ -77,8 +79,12 @@ process_queue()
{
job j;

warn("processing queue");
while (waiting_conn_p() && (j = pq_take(ready_q))) {
fprintf(stderr, "processing queue conns=%d j=%p\n", waiting_conn_p(), &ready_q[0]);
while (waiting_conn_p()) {
j = pq_take(ready_q);
fprintf(stderr, "j=%p\n", j);
if (!j) return;
warn("reserving job");
reserve_job(next_waiting_conn(), j);
}
}
Expand All @@ -96,9 +102,10 @@ enqueue_job(job j)
void
enqueue_waiting_conn(conn c)
{
c->next_waiting = NULL;
fprintf(stderr, "%d: putting in wait queue\n", c->fd);
c->next = NULL;
if (waiting_conn_p()) {
waiting_conn_rear->next_waiting = c;
waiting_conn_rear->next = c;
} else {
waiting_conn_front = c;
}
Expand All @@ -109,4 +116,5 @@ void
prot_init()
{
ready_q = make_pq(HEAP_SIZE);
waiting_conn_front = waiting_conn_rear = NULL;
}

0 comments on commit 6ea4e18

Please sign in to comment.