Skip to content

Commit

Permalink
Hook up peek. Requires some reorganization.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Sep 25, 2007
1 parent e28c4d3 commit c5d58cd
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 25 deletions.
43 changes: 29 additions & 14 deletions beanstalkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fill_extra_data(conn c)
if (c->in_job) {
job_data_bytes = min(extra_bytes, c->in_job->data_size);
memcpy(c->in_job->data, c->cmd + c->cmd_len, job_data_bytes);
c->in_job->data_xfer = job_data_bytes;
c->in_job_read = job_data_bytes;
}

/* how many bytes are left to go into the future cmd? */
Expand All @@ -127,6 +127,7 @@ enqueue_incoming_job(conn c)
job j = c->in_job;

c->in_job = NULL; /* the connection no longer owns this job */
c->in_job_read = 0;

/* check if the trailer is present and correct */
if (memcmp(j->data + j->data_size - 2, "\r\n", 2)) return conn_close(c);
Expand All @@ -146,7 +147,7 @@ maybe_enqueue_incoming_job(conn c)
job j = c->in_job;

/* do we have a complete job? */
if (j->data_xfer == j->data_size) return enqueue_incoming_job(c);
if (c->in_job_read == j->data_size) return enqueue_incoming_job(c);

/* otherwise we have incomplete data, so just keep waiting */
c->state = STATE_WANTDATA;
Expand All @@ -164,6 +165,7 @@ wait_for_job(conn c)
static void
dispatch_cmd(conn c)
{
job j;
char type;
char *size_buf, *end_buf;
unsigned int pri, data_size;
Expand Down Expand Up @@ -201,14 +203,25 @@ dispatch_cmd(conn c)

break;
case OP_PEEK:
errno = 0;
id = strtoull(c->cmd + CMD_PEEK_LEN, &end_buf, 10);
if (errno) return conn_close(c);

/* So, peek is annoying, because some other connection might free the
* job while we are still trying to write it out. So we copy it and
* then free the copy when it's done sending. */
j = job_copy(peek_job(id));

if (j) return reply_job(c, j, MSG_FOUND);

reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
break;
case OP_RESERVE:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_RESERVE_LEN + 2) return conn_close(c);

/* does this conn already have a job reserved? */
if (c->reserved_job) return reply_job(c, MSG_RESERVED);
if (c->reserved_job) return reply_job(c, c->reserved_job, MSG_RESERVED);

/* try to get a new job for this guy */
wait_for_job(c);
Expand Down Expand Up @@ -261,8 +274,10 @@ reset_conn(conn c)
r = conn_update_evq(c, EV_READ | EV_PERSIST);
if (r == -1) return warn("update events failed"), conn_close(c);

/* we are done transferring the job, so reset */
if (c->reserved_job) c->reserved_job->data_xfer = 0;
/* was this a peek command? */
if (c->out_job != c->reserved_job) free(c->out_job);
c->out_job = NULL;

c->reply_sent = 0; /* now that we're done, reset this */
c->state = STATE_WANTCOMMAND;
}
Expand Down Expand Up @@ -297,13 +312,13 @@ handle_connection(conn c)
case STATE_WANTDATA:
j = c->in_job;

r = read(c->fd, j->data + j->data_xfer, j->data_size - j->data_xfer);
r = read(c->fd, j->data + c->in_job_read, j->data_size - c->in_job_read);
if (r == -1) return check_err(c, "read()");
if (r == 0) return conn_close(c); /* the client hung up */

j->data_xfer += r; /* we got some bytes */
c->in_job_read += r; /* we got some bytes */

/* (j->data_xfer > j->data_size) can't happen */
/* (j->in_job_read > j->data_size) can't happen */

maybe_enqueue_incoming_job(c);
break;
Expand All @@ -321,12 +336,12 @@ handle_connection(conn c)
/* otherwise we sent an incomplete reply, so just keep waiting */
break;
case STATE_SENDJOB:
j = c->reserved_job;
j = c->out_job;

iov[0].iov_base = c->reply + c->reply_sent;
iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
iov[1].iov_base = j->data + j->data_xfer;
iov[1].iov_len = j->data_size - j->data_xfer;
iov[1].iov_base = j->data + c->out_job_sent;
iov[1].iov_len = j->data_size - c->out_job_sent;

r = writev(c->fd, iov, 2);
if (r == -1) return check_err(c, "writev()");
Expand All @@ -335,14 +350,14 @@ handle_connection(conn c)
/* update the sent values */
c->reply_sent += r;
if (c->reply_sent >= c->reply_len) {
j->data_xfer += c->reply_sent - c->reply_len;
c->out_job_sent += c->reply_sent - c->reply_len;
c->reply_sent = c->reply_len;
}

/* (j->data_xfer > j->data_size) can't happen */
/* (c->out_job_sent > j->data_size) can't happen */

/* are we done? */
if (j->data_xfer == j->data_size) return reset_conn(c);
if (c->out_job_sent == j->data_size) return reset_conn(c);

/* otherwise we sent incomplete data, so just keep waiting */
break;
Expand Down
2 changes: 2 additions & 0 deletions beanstalkd.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#define MSG_INSERTED "INSERTED\r\n"
#define MSG_NOT_INSERTED "NOT_INSERTED\r\n"
#define MSG_FOUND "FOUND"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
#define MSG_DELETED "DELETED\r\n"

Expand All @@ -25,6 +26,7 @@
#define CMD_STATS "stats"
#define CMD_JOBSTATS "stats "

#define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
#define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
#define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
#define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
Expand Down
10 changes: 6 additions & 4 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ make_conn(int fd, char start_state)
c->fd = fd;
c->state = start_state;
c->cmd_read = 0;
c->in_job = NULL;
c->reserved_job = NULL;
c->in_job = c->out_job = c->reserved_job = NULL;
c->in_job_read = c->out_job_sent = 0;

return c;
}
Expand Down Expand Up @@ -121,9 +121,11 @@ conn_close(conn c)
event_del(&c->evq);

close(c->fd);

if (c->reserved_job) enqueue_job(c->reserved_job);
if (c->in_job) free(c->in_job);
c->in_job = c->reserved_job = NULL;
free(c->in_job);
if (c->out_job != c->reserved_job) free(c->out_job); /* peek command? */
c->in_job = c->out_job = c->reserved_job = NULL;

conn_remove(c);
conn_free(c);
Expand Down
3 changes: 3 additions & 0 deletions conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ struct conn {
int reply_sent;
char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */
job in_job;
int in_job_read;
job out_job;
int out_job_sent;
job reserved_job;
conn prev, next; /* linked list of connections */
};
Expand Down
20 changes: 19 additions & 1 deletion job.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* job.c - a job in the queue */

#include <stdlib.h>
#include <string.h>

#include "job.h"
#include "util.h"
Expand All @@ -18,11 +19,28 @@ make_job(unsigned int pri, int data_size)
j->id = next_id++;
j->pri = pri;
j->data_size = data_size;
j->data_xfer = 0;

return j;
}

job
job_copy(job j)
{
job n;

if (!j) return NULL;

n = malloc(sizeof(struct job) + j->data_size);
if (!n) return warn("OOM"), NULL;

n->id = j->id;
n->pri = j->pri;
n->data_size = j->data_size;
memcpy(n->data, j->data, j->data_size);

return n;
}

int
job_cmp(job a, job b)
{
Expand Down
3 changes: 2 additions & 1 deletion job.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ typedef struct job {
unsigned long long int id;
unsigned int pri;
int data_size;
int data_xfer;
unsigned char data[];
} *job;

job make_job(unsigned int pri, int data_size);

int job_cmp(job a, job b);

job job_copy(job j);

#endif /*job_h*/
1 change: 1 addition & 0 deletions pq.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* pq.c - priority queue */

#include <stdlib.h>
#include <stdio.h>

#include "pq.h"

Expand Down
16 changes: 12 additions & 4 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ reply(conn c, char *line, int len, int state)
}

void
reply_job(conn c, const char *word)
reply_job(conn c, job j, const char *word)
{
int r;
job j = c->reserved_job;

/* tell this connection which job to send */
c->out_job = j;
c->out_job_sent = 0;

r = snprintf(c->reply_buf, LINE_BUF_SIZE, "%s %lld %d\r\n",
word, j->id, j->pri);
Expand All @@ -53,7 +56,7 @@ static void
reserve_job(conn c, job j)
{
c->reserved_job = j;
return reply_job(c, MSG_RESERVED);
return reply_job(c, j, MSG_RESERVED);
}

static conn
Expand Down Expand Up @@ -86,7 +89,6 @@ enqueue_job(job j)
{
int r;

j->data_xfer = 0; /* the queue owns this job now, so make it sendable */
r = pq_give(ready_q, j);
if (r) return process_queue(), 1;
return 0;
Expand All @@ -98,6 +100,12 @@ enqueue_waiting_conn(conn c)
conn_insert(&wait_queue, c);
}

job
peek_job(unsigned long long int id)
{
return pq_find(ready_q, id);
}

void
prot_init()
{
Expand Down
4 changes: 3 additions & 1 deletion prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
void prot_init();

void reply(conn c, char *line, int len, int state);
void reply_job(conn c, const char *word);
void reply_job(conn c, job j, const char *word);

void enqueue_waiting_conn(conn c);

int enqueue_job(job j);
void process_queue();

job peek_job(unsigned long long int id);

#endif /*prot_h*/

0 comments on commit c5d58cd

Please sign in to comment.