diff --git a/.gitignore b/.gitignore
index f0f4f695..70231782 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,4 @@ tests/cutcheck*
tests/cutgen
stamp-h1
version.h
+.*.swp
diff --git a/Makefile.am b/Makefile.am
index d4902ae0..4bc76506 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -12,11 +12,12 @@ beanstalkd_SOURCES = beanstalkd.c \
primes.c \
prot.c \
tube.c \
+ binlog.c \
util.c
cutgen_CFLAGS = -D__LINUX__
-EXTRA_DIST = conn.h job.h ms.h net.h pq.h primes.h prot.h stat.h tube.h util.h
+EXTRA_DIST = conn.h job.h ms.h net.h pq.h primes.h prot.h stat.h tube.h util.h binlog.h
tests = $(abs_srcdir)/tests/test_conn.c \
$(abs_srcdir)/tests/test_job.c \
diff --git a/beanstalkd.c b/beanstalkd.c
index b4214049..a371809b 100644
--- a/beanstalkd.c
+++ b/beanstalkd.c
@@ -34,6 +34,7 @@
#include "util.h"
#include "prot.h"
#include "version.h"
+#include "binlog.h"
static char *user = NULL;
static int detach = 0;
@@ -93,6 +94,7 @@ su(const char *user) {
void
exit_cleanly(int sig)
{
+ binlog_close();
exit(0);
}
@@ -153,6 +155,7 @@ usage(char *msg, char *arg)
"\n"
"Options:\n"
" -d detach\n"
+ " -b DIR binlog directory\n"
" -l ADDR listen on address (default is 0.0.0.0)\n"
" -p PORT listen on port (default is 11300)\n"
" -u USER become user and group\n"
@@ -225,6 +228,9 @@ opts(int argc, char **argv)
case 'u':
user = argv[++i];
break;
+ case 'b':
+ binlog_dir = argv[++i];
+ break;
case 'h':
usage(NULL, NULL);
case 'v':
@@ -257,8 +263,11 @@ main(int argc, char **argv)
nudge_fd_limit();
unbrake((evh) h_accept);
+ prot_replay_binlog();
+ binlog_init();
event_dispatch();
+ binlog_close();
twarnx("got here for some reason");
return 0;
}
diff --git a/binlog.c b/binlog.c
new file mode 100644
index 00000000..d8351726
--- /dev/null
+++ b/binlog.c
@@ -0,0 +1,365 @@
+/* binlog.c - binary log implementation */
+
+/* Copyright (C) 2008 Graham Barr
+
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "tube.h"
+#include "job.h"
+#include "binlog.h"
+#include "util.h"
+#include "version.h"
+
+/* max size we will create a log file */
+size_t binlog_size_limit = 10 << 20;
+
+char *binlog_dir = NULL;
+static int binlog_index = 0;
+static int binlog_fd = -1;
+static int binlog_version = 1;
+static size_t bytes_written;
+
+static binlog first_binlog = NULL, last_binlog = NULL;
+
+static int
+binlog_scan_dir()
+{
+ DIR *dirp;
+ struct dirent *dp;
+ long min = 0;
+ long max = 0;
+ long val;
+ char *endptr;
+ size_t name_len;
+
+ dirp = opendir(binlog_dir);
+ if (!dirp) return 0;
+
+ while ((dp = readdir(dirp)) != NULL) {
+ name_len = strlen(dp->d_name);
+ if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
+ val = strtol(dp->d_name + 7, &endptr, 10);
+ if (endptr && *endptr == 0) {
+ if (max == 0 || val > max) max = val;
+ if (min == 0 || val < min) min = val;
+ }
+ }
+ }
+
+ closedir(dirp);
+ binlog_index = (int) max;
+ return (int) min;
+}
+
+static void
+binlog_remove_first()
+{
+ binlog b = first_binlog;
+
+ if (!b) return;
+
+ first_binlog = b->next;
+ if (!first_binlog) last_binlog = NULL;
+
+ unlink(b->path);
+ free(b);
+}
+
+static binlog
+binlog_iref(binlog b)
+{
+ if (b) b->refs++;
+ return b;
+}
+
+static void
+binlog_dref(binlog b)
+{
+ if (!b) return;
+ if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
+
+ --b->refs;
+ if (b->refs < 1) {
+ while (first_binlog && first_binlog->refs == 0) binlog_remove_first();
+ }
+}
+
+static void
+binlog_replay(int fd, job binlog_jobs)
+{
+ struct job js;
+ tube t;
+ job j;
+ char tubename[MAX_TUBE_NAME_LEN];
+ size_t namelen;
+ int version;
+
+ if (read(fd, &version, sizeof(version)) < sizeof(version)) {
+ return twarn("read()");
+ }
+ if (version != binlog_version) {
+ return twarnx("binlog version mismatch %d %d", version, binlog_version);
+ }
+
+ while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
+ if (namelen > 0 && read(fd, tubename, namelen) != namelen) {
+ return twarnx("oops %x %d", namelen, (int)lseek(fd, SEEK_CUR, 0));
+ }
+
+ tubename[namelen] = '\0';
+ if (read(fd, &js, sizeof(struct job)) != sizeof(struct job)) {
+ return twarn("read()");
+ }
+
+ j = job_find(js.id);
+ switch (js.state) {
+ case JOB_STATE_INVALID:
+ if (j) {
+ job_remove(j);
+ binlog_dref(j->binlog);
+ job_free(j);
+ j = NULL;
+ }
+ break;
+ case JOB_STATE_READY:
+ case JOB_STATE_DELAYED:
+ if (!j) {
+ t = tube_find_or_make(tubename);
+ j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
+ t, js.id);
+ j->next = j->prev = j;
+ j->creation = js.creation;
+ job_insert(binlog_jobs, j);
+ if (read(fd, j->body, js.body_size) < js.body_size) {
+ twarn("read()");
+ return;
+ }
+ }
+ break;
+ }
+ if (j) {
+ j->state = js.state;
+ j->deadline = js.deadline;
+ j->pri = js.pri;
+ j->delay = js.delay;
+ j->ttr = js.ttr;
+ j->timeout_ct = js.timeout_ct;
+ j->release_ct = js.release_ct;
+ j->bury_ct = js.bury_ct;
+ j->kick_ct = js.kick_ct;
+
+ /* this is a complete record, so we can move the binlog ref */
+ if (namelen && js.body_size) {
+ binlog_dref(j->binlog);
+ j->binlog = binlog_iref(last_binlog);
+ }
+ }
+ }
+}
+
+void
+binlog_close()
+{
+ if (binlog_fd < 0) return;
+ close(binlog_fd);
+ binlog_dref(last_binlog);
+ binlog_fd = -1;
+}
+
+static binlog
+add_binlog(char *path)
+{
+ binlog b;
+
+ b = (binlog)malloc(sizeof(struct binlog) + strlen(path) + 1);
+ if (!b) return twarnx("OOM"), NULL;
+ strcpy(b->path, path);
+ b->refs = 0;
+ b->next = NULL;
+ if (last_binlog) last_binlog->next = b;
+ last_binlog = b;
+ if (!first_binlog) first_binlog = b;
+
+ return b;
+}
+
+static int
+binlog_open()
+{
+ char path[PATH_MAX];
+ binlog b;
+ int fd, r;
+
+ if (!binlog_dir) return -1;
+ r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
+ if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), -1;
+
+ if (!binlog_iref(add_binlog(path))) return -1;
+ fd = open(path, O_WRONLY | O_CREAT, 0400);
+
+ if (fd < 0) {
+ twarn("Cannot open binlog %s", path);
+ return -1;
+ }
+
+
+ bytes_written = write(fd, &binlog_version, sizeof(int));
+
+ if (bytes_written < sizeof(int)) {
+ twarn("Cannot write to binlog");
+ close(fd);
+ binlog_dref(last_binlog);
+ return -1;
+ }
+
+ return fd;
+}
+
+static void
+binlog_open_next()
+{
+ if (binlog_fd < 0) return;
+ close(binlog_fd);
+ binlog_dref(last_binlog);
+ binlog_fd = binlog_open();
+}
+
+void
+binlog_write_job(job j)
+{
+ size_t tube_namelen, to_write;
+ struct iovec vec[4], *vptr;
+ int vcnt = 3;
+
+ if (binlog_fd < 0) return;
+ tube_namelen = 0;
+
+ vec[0].iov_base = (char *) &tube_namelen;
+ vec[0].iov_len = sizeof(size_t);
+ to_write = sizeof(size_t);
+
+ vec[1].iov_base = j->tube->name;
+ vec[1].iov_len = 0;
+
+ /* we could save some bytes in the binlog file by only saving some parts of
+ * the job struct */
+ vec[2].iov_base = (char *) j;
+ vec[2].iov_len = sizeof(struct job);
+ to_write += sizeof(struct job);
+
+ if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
+ if (!j->binlog) {
+ tube_namelen = strlen(j->tube->name);
+ vec[1].iov_len = tube_namelen;
+ to_write += tube_namelen;
+ vcnt = 4;
+ vec[3].iov_base = j->body;
+ vec[3].iov_len = j->body_size;
+ to_write += j->body_size;
+ }
+ } else if (j->state == JOB_STATE_INVALID) {
+ if (j->binlog) binlog_dref(j->binlog);
+ j->binlog = NULL;
+ } else {
+ return twarnx("unserializable job state: %d", j->state);
+ }
+
+ if ((bytes_written + to_write) > binlog_size_limit) binlog_open_next();
+ if (binlog_fd < 0) return;
+
+ if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog);
+
+ while (to_write > 0) {
+ size_t written = writev(binlog_fd, vec, vcnt);
+
+ if (written < 0) {
+ twarn("Cannot write to binlog");
+ binlog_close();
+ return;
+ }
+
+ bytes_written += written;
+ to_write -= written;
+ if (to_write > 0 && written > 0) {
+ for (vptr = vec; written >= vptr->iov_len; vptr++) {
+ written -= vptr->iov_len;
+ vptr->iov_len = 0;
+ }
+ vptr->iov_base = (char *) vptr->iov_base + written;
+ vptr->iov_len -= written;
+ }
+ }
+}
+
+
+void
+binlog_read(job binlog_jobs)
+{
+ int binlog_index_min;
+ struct stat sbuf;
+ int fd, idx, r;
+ char path[PATH_MAX];
+ binlog b;
+
+ if (!binlog_dir) return;
+
+ if (stat(binlog_dir, &sbuf) < 0) {
+ if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
+ } else if (!(sbuf.st_mode & S_IFDIR)) {
+ twarnx("%s", binlog_dir);
+ return;
+ }
+
+ binlog_index_min = binlog_scan_dir();
+
+ if (binlog_index_min) {
+ for (idx = binlog_index_min; idx <= binlog_index; idx++) {
+ r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
+ if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
+
+ fd = open(path, O_RDONLY);
+
+ if (fd < 0) {
+ twarn("%s", path);
+ } else {
+ b = binlog_iref(add_binlog(path));
+ binlog_replay(fd, binlog_jobs);
+ close(fd);
+ binlog_dref(b);
+ }
+ }
+
+ }
+}
+
+void
+binlog_init()
+{
+ binlog_fd = binlog_open();
+}
+
diff --git a/binlog.h b/binlog.h
new file mode 100644
index 00000000..68a41503
--- /dev/null
+++ b/binlog.h
@@ -0,0 +1,39 @@
+/* binlog.h - binary log implementation */
+
+/* Copyright (C) 2008 Graham Barr
+
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef binlog_h
+#define binlog_h
+
+#include "job.h"
+
+typedef struct binlog *binlog;
+
+struct binlog {
+ binlog next;
+ unsigned int refs;
+ char path[];
+};
+
+extern char *binlog_dir;
+
+void binlog_write_job(job j);
+void binlog_read(job binlog_jobs);
+void binlog_close();
+
+#endif
+
diff --git a/job.c b/job.c
index 0fb72ffd..46cf7cd2 100644
--- a/job.c
+++ b/job.c
@@ -116,20 +116,26 @@ allocate_job(int body_size)
j->next = j->prev = j; /* not in a linked list */
j->ht_next = NULL;
j->tube = NULL;
+ j->binlog = NULL;
return j;
}
job
-make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size,
- tube tube)
+make_job_with_id(unsigned int pri, unsigned int delay, unsigned int ttr,
+ int body_size, tube tube, unsigned long long id)
{
job j;
j = allocate_job(body_size);
if (!j) return twarnx("OOM"), NULL;
- j->id = next_id++;
+ if (id) {
+ j->id = id;
+ if (id >= next_id) next_id = id + 1;
+ } else {
+ j->id = next_id++;
+ }
j->pri = pri;
j->delay = delay;
j->ttr = ttr;
diff --git a/job.h b/job.h
index 670b8ef9..117ce4f5 100644
--- a/job.h
+++ b/job.h
@@ -47,14 +47,17 @@ struct job {
unsigned int kick_ct;
tube tube;
void *reserver;
+ void *binlog;
char state;
job ht_next; /* Next job in a hash table list */
char body[];
};
+#define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
+
job allocate_job(int body_size);
-job make_job(unsigned int pri, unsigned int delay, unsigned int ttr,
- int body_size, tube tube);
+job make_job_with_id(unsigned int pri, unsigned int delay, unsigned int ttr,
+ int body_size, tube tube, unsigned long long id);
void job_free(job j);
/* Lookup a job by job ID */
diff --git a/prot.c b/prot.c
index 74def2e1..02f195cf 100644
--- a/prot.c
+++ b/prot.c
@@ -34,6 +34,7 @@
#include "conn.h"
#include "util.h"
#include "net.h"
+#include "binlog.h"
#include "version.h"
/* job body cannot be greater than this many bytes long */
@@ -222,7 +223,6 @@ static unsigned int ready_ct = 0;
static struct stats global_stat = {0, 0, 0, 0, 0};
static tube default_tube;
-static struct ms tubes;
static int drain_mode = 0;
static time_t start_time;
@@ -434,6 +434,7 @@ enqueue_job(job j, unsigned int delay)
j->tube->stat.urgent_ct++;
}
}
+ binlog_write_job(j);
process_queue();
return 1;
}
@@ -447,6 +448,7 @@ bury_job(job j)
j->state = JOB_STATE_BURIED;
j->reserver=NULL;
j->bury_ct++;
+ binlog_write_job(j);
}
void
@@ -978,48 +980,12 @@ name_is_ok(const char *name, size_t max)
strspn(name, NAME_CHARS) == len && name[0] != '-';
}
-static tube
-find_tube(const char *name)
-{
- tube t;
- size_t i;
-
- for (i = 0; i < tubes.used; i++) {
- t = tubes.items[i];
- if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
- }
- return NULL;
-}
-
void
prot_remove_tube(tube t)
{
ms_remove(&tubes, t);
}
-static tube
-make_and_insert_tube(const char *name)
-{
- int r;
- tube t = NULL;
-
- t = make_tube(name);
- if (!t) return NULL;
-
- /* We want this global tube list to behave like "weak" refs, so don't
- * increment the ref count. */
- r = ms_append(&tubes, t);
- if (!r) return tube_dref(t), NULL;
-
- return t;
-}
-
-static tube
-find_or_make_tube(const char *name)
-{
- return find_tube(name) ? : make_and_insert_tube(name);
-}
-
static void
dispatch_cmd(conn c)
{
@@ -1175,6 +1141,8 @@ dispatch_cmd(conn c)
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
+ j->state = JOB_STATE_INVALID;
+ binlog_write_job(j);
job_free(j);
reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
@@ -1264,7 +1232,7 @@ dispatch_cmd(conn c)
op_ct[type]++;
- t = find_tube(name);
+ t = tube_find(name);
if (!t) return reply_msg(c, MSG_NOTFOUND);
do_stats(c, (fmt_fn) fmt_stats_tube, t);
@@ -1302,7 +1270,7 @@ dispatch_cmd(conn c)
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
- TUBE_ASSIGN(t, find_or_make_tube(name));
+ TUBE_ASSIGN(t, tube_find_or_make(name));
if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
c->use->using_ct--;
@@ -1317,7 +1285,7 @@ dispatch_cmd(conn c)
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
- TUBE_ASSIGN(t, find_or_make_tube(name));
+ TUBE_ASSIGN(t, tube_find_or_make(name));
if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
r = 1;
@@ -1612,6 +1580,32 @@ prot_init()
ms_init(&tubes, NULL, NULL);
- TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
+ TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
if (!default_tube) twarnx("Out of memory during startup!");
}
+
+void
+prot_replay_binlog()
+{
+ struct job binlog_jobs;
+ job j, nj;
+ unsigned int delay;
+
+ binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
+ binlog_read(&binlog_jobs);
+
+ for(j = binlog_jobs.next ; j != &binlog_jobs ; j = nj) {
+ nj = j->next;
+ job_remove(j);
+ delay = 0;
+ switch (j->state) {
+ case JOB_STATE_BURIED:
+ bury_job(j);
+ break;
+ case JOB_STATE_DELAYED:
+ if (start_time < j->deadline) delay = j->deadline - start_time;
+ default:
+ enqueue_job(j,delay);
+ }
+ }
+}
diff --git a/prot.h b/prot.h
index 72ccad3e..f2410c37 100644
--- a/prot.h
+++ b/prot.h
@@ -35,5 +35,6 @@ void enqueue_reserved_jobs(conn c);
void enter_drain_mode(int sig);
void h_accept(const int fd, const short which, struct event *ev);
void prot_remove_tube(tube t);
+void prot_replay_binlog();
#endif /*prot_h*/
diff --git a/tube.c b/tube.c
index c0333770..86de1d96 100644
--- a/tube.c
+++ b/tube.c
@@ -19,11 +19,14 @@
#include
#include
+#include "ms.h"
#include "stat.h"
#include "tube.h"
#include "prot.h"
#include "util.h"
+struct ms tubes;
+
tube
make_tube(const char *name)
{
@@ -74,3 +77,40 @@ tube_iref(tube t)
if (!t) return;
++t->refs;
}
+
+static tube
+make_and_insert_tube(const char *name)
+{
+ int r;
+ tube t = NULL;
+
+ t = make_tube(name);
+ if (!t) return NULL;
+
+ /* We want this global tube list to behave like "weak" refs, so don't
+ * increment the ref count. */
+ r = ms_append(&tubes, t);
+ if (!r) return tube_dref(t), NULL;
+
+ return t;
+}
+
+tube
+tube_find(const char *name)
+{
+ tube t;
+ size_t i;
+
+ for (i = 0; i < tubes.used; i++) {
+ t = tubes.items[i];
+ if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
+ }
+ return NULL;
+}
+
+tube
+tube_find_or_make(const char *name)
+{
+ return tube_find(name) ? : make_and_insert_tube(name);
+}
+
diff --git a/tube.h b/tube.h
index 0abe85ef..ccd78d3e 100644
--- a/tube.h
+++ b/tube.h
@@ -42,9 +42,13 @@ struct tube {
unsigned int watching_ct;
};
+extern struct ms tubes;
+
tube make_tube(const char *name);
void tube_dref(tube t);
void tube_iref(tube t);
+tube tube_find(const char *name);
+tube tube_find_or_make(const char *name);
#define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
#endif /*tube_h*/