Skip to content

Commit

Permalink
Implement tubes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Feb 26, 2008
1 parent 3559424 commit c1b09c4
Show file tree
Hide file tree
Showing 13 changed files with 634 additions and 70 deletions.
32 changes: 28 additions & 4 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,40 @@ static void
conn_free(conn c)
{
c->fd = 0;
TUBE_ASSIGN(c->use, NULL);
conn_insert(&pool, c);
}

static void
inc(ms a, tube t, size_t i)
{
tube_iref(t);
}

static void
dec(ms a, tube t, size_t i)
{
tube_dref(t);
}

conn
make_conn(int fd, char start_state)
make_conn(int fd, char start_state, tube use, tube watch)
{
job j;
conn c;

c = conn_alloc();
if (!c) return twarn("OOM"), NULL;

ms_init(&c->watch, (ms_event_fn) inc, (ms_event_fn) dec);
if (!ms_append(&c->watch, watch)) {
conn_free(c);
return twarn("OOM"), NULL;
}

c->use = NULL; /* initialize */
TUBE_ASSIGN(c->use, use);

c->fd = fd;
c->state = start_state;
c->type = 0;
Expand Down Expand Up @@ -149,7 +171,7 @@ conn_update_evq(conn c, const int events)
return conn_set_evq(c, events, c->evq.ev_callback);
}

int
static int
conn_list_any_p(conn head)
{
return head->next != head || head->prev != head;
Expand Down Expand Up @@ -209,10 +231,10 @@ conn_close(conn c)

close(c->fd);

free(c->in_job);
job_free(c->in_job);

/* was this a peek or stats command? */
if (!has_reserved_this_job(c, c->out_job)) free(c->out_job);
if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);

c->in_job = c->out_job = NULL;

Expand All @@ -226,5 +248,7 @@ conn_close(conn c)
conn_remove(c);
if (has_reserved_job(c)) enqueue_reserved_jobs(c);

ms_clear(&c->watch);

conn_free(c);
}
10 changes: 8 additions & 2 deletions conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#define conn_h

#include "event.h"
#include "ms.h"
#include "job.h"
#include "tube.h"

#define STATE_WANTCOMMAND 0
#define STATE_WANTDATA 1
Expand All @@ -44,6 +46,9 @@
#define OP_STATS 8
#define OP_JOBSTATS 9
#define OP_PEEK 10
#define OP_USE 11
#define OP_WATCH 12
#define OP_IGNORE 13

/* CONN_TYPE_* are bit masks */
#define CONN_TYPE_PRODUCER 1
Expand Down Expand Up @@ -74,16 +79,17 @@ struct conn {
job out_job;
int out_job_sent;
struct job reserved_jobs; /* doubly-linked list header */
tube use;
struct ms watch;
};

conn make_conn(int fd, char start_state);
conn make_conn(int fd, char start_state, tube use, tube watch);

int conn_set_evq(conn c, const int events, evh handler);
int conn_update_evq(conn c, const int flags);

void conn_close(conn c);

int conn_list_any_p(conn head);
conn conn_remove(conn c);
void conn_insert(conn head, conn c);

Expand Down
74 changes: 71 additions & 3 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ were received and sends responses in the same order. All integers in the
protocol are formatted in decimal and (unless otherwise indicated)
nonnegative.

Names, in this protocol, are ASCII strings. They may contain letters (A-Z and
a-z), numerals (0-9), hyphen ("-"), plus ("+"), slash ("/"), semicolon (";"),
dot ("."), dollar-sign ("$"), and parentheses ("(" and ")"), but they may not
begin with a hyphen. They are terminated by white space (either a space char or
end of line). Each name must be at least one character long.

The protocol contains two kinds of data: text lines and unstructured chunks of
data. Text lines are used for client commands and server responses. Chunks are
used to transfer job bodies and stats information. Each job body is an opaque
Expand Down Expand Up @@ -106,17 +112,32 @@ Here is a picture with more possibilities:
`--------> *poof*


The system has one or more tubes. Each tube consists of a ready queue and a
delay queue. Each job spends its entire life in one tube. Consumers can show
interest in tubes by sending the "watch" command; they can show disinterest by
sending the "ignore" command. This set of interesting tubes is said to be a
consumer's "watch list". When a client reserves a job, it may come from any of
the tubes in its watch list.

When a client connects, its watch list is initially just the tube named
"default". If it submits jobs without having sent a "use" command, they will
live in the tube named "default".

Tubes are created on demand whenever they are referenced. If a tube is empty
(that is, it contains no ready, delayed, or buried jobs) and no client refers
to it, it will be deleted.

Producer Command
----------------
Producer Commands
-----------------

The "put" command is for any process that wants to insert a job into the queue.
It comprises a command line followed by the job body:

put <pri> <delay> <ttr> <bytes>\r\n
<data>\r\n

It inserts a job into the queue.
It inserts a job into the client's currently used tube (see the "use" command
below).

- <pri> is an integer < 2**32. Jobs with smaller priority values will be
scheduled before jobs with larger priorities. The most urgent priority is 0;
Expand Down Expand Up @@ -150,6 +171,19 @@ may be:

- <id> is the integer id of the new job

The "use" command is for producers. Subsequent put commands will put jobs into
the tube specified by this command. If no use command has been issued, jobs
will be put into the tube named "default".

use <tube>\r\n

- <tube> is a name at most 200 bytes. It specifies the tube to use. If the
tube does not exist, it will be created.

The only reply is:

USING\r\n

Worker Commands
---------------

Expand Down Expand Up @@ -239,6 +273,36 @@ There are two possible responses:

- "NOT_FOUND\r\n" if the job does not exist or is not reserved by the client.

The "watch" command adds the named tube to the watch list for the current
connection. A reserve command will take a job from any of the tubes in the
watch list. For each new connection, the watch list initially consists of one
tube, named "default".

watch <tube>\r\n

- <tube> is a name at most 200 bytes. It specifies a tube to add to the watch
list.

The reply is:

WATCHING <count>\r\n

- <count> is the integer number of tubes currently in the watch list.

The "ignore" command is for consumers. It removes the named tube from the
watch list for the current connection.

ignore <tube>\r\n

The reply is one of:

- "WATCHING <count>\r\n" to indicate success.

- <count> is the integer number of tubes currently in the watch list.

- "NOT_IGNORED\r\n" if the client attempts to ignore the only tube in its
watch list.

Other Commands
--------------

Expand Down Expand Up @@ -315,6 +379,8 @@ scalars.

- "id" is the job id

- "tube" is the name of the tube that contains this job

- "state" is "ready" or "delayed" or "reserved" or "buried"

- "age" is the time in seconds since the put command that created this job.
Expand Down Expand Up @@ -366,6 +432,8 @@ scalars.

- "total-jobs" is the cumulative count of jobs created.

- "current-tubes" is the number of currently-existing tubes.

- "current-connections" is the number of currently open connections.

- "current-producers" is the number of open connections that have each
Expand Down
20 changes: 15 additions & 5 deletions job.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string.h>

#include "job.h"
#include "tube.h"
#include "util.h"

static unsigned long long int next_id = 1;
Expand All @@ -37,12 +38,14 @@ allocate_job(int body_size)
j->timeout_ct = j->release_ct = j->bury_ct = j->kick_ct = 0;
j->body_size = body_size;
j->next = j->prev = j; /* not in a linked list */
j->tube = NULL;

return j;
}

job
make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size)
make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size,
tube tube)
{
job j;

Expand All @@ -53,10 +56,18 @@ make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size)
j->pri = pri;
j->delay = delay;
j->ttr = ttr;
TUBE_ASSIGN(j->tube, tube);

return j;
}

void
job_free(job j)
{
if (j) TUBE_ASSIGN(j->tube, NULL);
free(j);
}

int
job_pri_cmp(job a, job b)
{
Expand Down Expand Up @@ -91,10 +102,9 @@ job_copy(job j)
n = malloc(sizeof(struct job) + j->body_size);
if (!n) return twarnx("OOM"), NULL;

n->id = j->id;
n->pri = j->pri;
n->body_size = j->body_size;
memcpy(n->body, j->body, j->body_size);
memcpy(n, j, sizeof(struct job) + j->body_size);
n->next = n->prev = n; /* not in a linked list */
TUBE_ASSIGN(n->tube, j->tube);

return n;
}
Expand Down
12 changes: 8 additions & 4 deletions job.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@

#include <time.h>

typedef struct job *job;
typedef int(*job_cmp_fn)(job, job);

#include "tube.h"

#define JOB_STATE_INVALID 0
#define JOB_STATE_READY 1
#define JOB_STATE_RESERVED 2
#define JOB_STATE_BURIED 3
#define JOB_STATE_DELAYED 4

typedef struct job *job;

struct job {
job prev, next; /* linked list of jobs */
unsigned long long int id;
Expand All @@ -42,15 +45,16 @@ struct job {
unsigned int release_ct;
unsigned int bury_ct;
unsigned int kick_ct;
tube tube;
char state;
char body[];
};

job allocate_job(int body_size);
job make_job(unsigned int pri, unsigned int delay, unsigned int ttr,
int body_size);
int body_size, tube tube);
void job_free(job j);

typedef int(*job_cmp_fn)(job, job);
int job_pri_cmp(job a, job b);
int job_delay_cmp(job a, job b);

Expand Down
Loading

0 comments on commit c1b09c4

Please sign in to comment.