Skip to content

Commit

Permalink
raft: Introduce local_data and is_local for entries
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Apr 29, 2024
1 parent b5c4de3 commit 44dbf91
Show file tree
Hide file tree
Showing 29 changed files with 210 additions and 130 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
env:
CC: ${{ matrix.compiler }}
LIBDQLITE_TRACE: 1
ASAN_OPTIONS: fast_unwind_on_malloc=0
run: |
make check || (cat ./test-suite.log && false)
Expand Down
10 changes: 10 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ static void leaderMaybeCheckpointLegacy(struct leader *l)
tracef("raft_malloc - no mem");
goto err_after_buf_alloc;
}
#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, apply, &buf, 1, leaderCheckpointApplyCb);
#else
rv = raft_apply(l->raft, apply, &buf, NULL, 1, leaderCheckpointApplyCb);
#endif
if (rv != 0) {
tracef("raft_apply failed %d", rv);
raft_free(apply);
Expand Down Expand Up @@ -332,7 +336,13 @@ static int leaderApplyFrames(struct exec *req,
apply->type = COMMAND_FRAMES;
idSet(apply->req.req_id, req->id);

#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb);
#else
/* TODO actual WAL slice goes here */
struct raft_entry_local_data local_data = {};
rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb);
#endif
if (rv != 0) {
tracef("raft apply failed %d", rv);
goto err_after_command_encode;
Expand Down
8 changes: 8 additions & 0 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ enum {
RAFT_CHANGE /* Raft configuration change. */
};

struct raft_entry_local_data {
/* Must be the only member of this struct. */
uint8_t buf[16];
};

/**
* A single entry in the raft log.
*
Expand Down Expand Up @@ -226,6 +231,8 @@ struct raft_entry
raft_term term; /* Term in which the entry was created. */
unsigned short type; /* Type (FSM command, barrier, config change). */
struct raft_buffer buf; /* Entry data. */
struct raft_entry_local_data local_data; /* Populated locally, persisted, not sent. */
bool is_local; /* Not persisted; true if we originated this entry. */
void *batch; /* Batch that buf's memory points to, if any. */
};

Expand Down Expand Up @@ -1082,6 +1089,7 @@ struct raft_apply
RAFT_API int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb);

Expand Down
5 changes: 3 additions & 2 deletions src/raft/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb)
{
Expand Down Expand Up @@ -41,7 +42,7 @@ int raft_apply(struct raft *r,
req->cb = cb;

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, n);
rv = logAppendCommands(r->log, r->current_term, bufs, local_data, n);
if (rv != 0) {
goto err;
}
Expand Down Expand Up @@ -90,7 +91,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
req->index = index;
req->cb = cb;

rv = logAppend(r->log, r->current_term, RAFT_BARRIER, &buf, NULL);
rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, (struct raft_entry_local_data){}, true, NULL);
if (rv != 0) {
goto err_after_buf_alloc;
}
Expand Down
16 changes: 10 additions & 6 deletions src/raft/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ int entryCopy(const struct raft_entry *src, struct raft_entry *dst)
if (dst->buf.len > 0 && dst->buf.base == NULL) {
return RAFT_NOMEM;
}
assert(dst->buf.len > 0);
memcpy(dst->buf.base, src->buf.base, dst->buf.len);
dst->batch = NULL;
return 0;
Expand Down Expand Up @@ -71,13 +72,16 @@ int entryBatchCopy(const struct raft_entry *src,

cursor = batch;

struct raft_entry *out = *dst;
for (i = 0; i < n; i++) {
(*dst)[i].term = src[i].term;
(*dst)[i].type = src[i].type;
(*dst)[i].buf.base = cursor;
(*dst)[i].buf.len = src[i].buf.len;
(*dst)[i].batch = batch;
memcpy((*dst)[i].buf.base, src[i].buf.base, src[i].buf.len);
out[i].term = src[i].term;
out[i].type = src[i].type;
out[i].buf.base = cursor;
out[i].buf.len = src[i].buf.len;
out[i].batch = batch;
if (out[i].buf.len > 0) {
memcpy(out[i].buf.base, src[i].buf.base, out[i].buf.len);
}
cursor += src[i].buf.len;
}
return 0;
Expand Down
3 changes: 2 additions & 1 deletion src/raft/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,8 @@ static void copyLeaderLog(struct raft_fixture *f)
buf.base = raft_malloc(buf.len);
assert(buf.base != NULL);
memcpy(buf.base, entry->buf.base, buf.len);
rv = logAppend(f->log, entry->term, entry->type, &buf, NULL);
/* FIXME(cole) what to do here for is_local? */
rv = logAppend(f->log, entry->term, entry->type, buf, (struct raft_entry_local_data){}, false, NULL);
assert(rv == 0);
}
logRelease(raft->log, 1, entries, n);
Expand Down
19 changes: 12 additions & 7 deletions src/raft/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <string.h>

#include "../raft.h"
#include "../tracing.h"
#include "assert.h"
#include "configuration.h"

Expand Down Expand Up @@ -545,7 +546,9 @@ int logReinstate(struct raft_log *l,
int logAppend(struct raft_log *l,
const raft_term term,
const unsigned short type,
const struct raft_buffer *buf,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch)
{
int rv;
Expand All @@ -556,7 +559,6 @@ int logAppend(struct raft_log *l,
assert(term > 0);
assert(type == RAFT_CHANGE || type == RAFT_BARRIER ||
type == RAFT_COMMAND);
assert(buf != NULL);

rv = ensureCapacity(l);
if (rv != 0) {
Expand All @@ -565,16 +567,18 @@ int logAppend(struct raft_log *l,

index = logLastIndex(l) + 1;

rv = refsInit(l, term, index, *buf, batch);
rv = refsInit(l, term, index, buf, batch);
if (rv != 0) {
return rv;
}

entry = &l->entries[l->back];
entry->term = term;
entry->type = type;
entry->buf = *buf;
entry->buf = buf;
entry->batch = batch;
entry->local_data = local_data;
entry->is_local = is_local;

l->back += 1;
l->back = l->back % l->size;
Expand All @@ -585,6 +589,7 @@ int logAppend(struct raft_log *l,
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n)
{
unsigned i;
Expand All @@ -596,8 +601,8 @@ int logAppendCommands(struct raft_log *l,
assert(n > 0);

for (i = 0; i < n; i++) {
const struct raft_buffer *buf = &bufs[i];
rv = logAppend(l, term, RAFT_COMMAND, buf, NULL);
struct raft_entry_local_data loc = (local_data != NULL) ? local_data[i] : (struct raft_entry_local_data){};
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], loc, true, NULL);
if (rv != 0) {
return rv;
}
Expand All @@ -624,7 +629,7 @@ int logAppendConfiguration(struct raft_log *l,
}

/* Append the new entry to the log. */
rv = logAppend(l, term, RAFT_CHANGE, &buf, NULL);
rv = logAppend(l, term, RAFT_CHANGE, buf, (struct raft_entry_local_data){}, true, NULL);
if (rv != 0) {
goto err_after_encode;
}
Expand Down
5 changes: 4 additions & 1 deletion src/raft/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ int logReinstate(struct raft_log *l,
int logAppend(struct raft_log *l,
raft_term term,
unsigned short type,
const struct raft_buffer *buf,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch);

/* Convenience to append a series of #RAFT_COMMAND entries. */
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n);

/* Convenience to encode and append a single #RAFT_CHANGE entry. */
Expand Down
2 changes: 1 addition & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ int replicationAppend(struct raft *r,
goto err_after_request_alloc;
}

rv = logAppend(r->log, copy.term, copy.type, &copy.buf, NULL);
rv = logAppend(r->log, copy.term, copy.type, copy.buf, (struct raft_entry_local_data){}, false, NULL);
if (rv != 0) {
goto err_after_request_alloc;
}
Expand Down
4 changes: 2 additions & 2 deletions src/raft/start.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ static int restoreEntries(struct raft *r,
r->last_stored = start_index - 1;
for (i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
rv = logAppend(r->log, entry->term, entry->type, &entry->buf,
entry->batch);
rv = logAppend(r->log, entry->term, entry->type, entry->buf,
entry->local_data, entry->is_local, entry->batch);
if (rv != 0) {
goto err;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ static size_t uvAppendSize(struct uvAppend *a)
{
size_t size = sizeof(uint32_t) * 2; /* CRC checksums */
unsigned i;
size += uvSizeofBatchHeader(a->n); /* Batch header */
size += uvSizeofBatchHeader(a->n, true); /* Batch header */
for (i = 0; i < a->n; i++) { /* Entries data */
size += bytePad64(a->entries[i].buf.len);
}
Expand Down
66 changes: 45 additions & 21 deletions src/raft/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "assert.h"
#include "byte.h"
#include "configuration.h"
#include "../tracing.h"

/**
* Size of the request preamble.
Expand Down Expand Up @@ -86,10 +87,14 @@ static size_t sizeofTimeoutNow(void)
sizeof(uint64_t) /* Last log term. */;
}

size_t uvSizeofBatchHeader(size_t n)
size_t uvSizeofBatchHeader(size_t n, bool with_local_data)
{
return 8 + /* Number of entries in the batch, little endian */
16 * n /* One header per entry */;
size_t res = 8 + /* Number of entries in the batch, little endian */
16 * n; /* One header per entry */;
if (with_local_data) {
res += 8; /* Local data length, applies to all entries */
}
return res;
}

static void encodeRequestVote(const struct raft_request_vote *p, void *buf)
Expand Down Expand Up @@ -137,7 +142,7 @@ static void encodeAppendEntries(const struct raft_append_entries *p, void *buf)
bytePut64(&cursor, p->prev_log_term); /* Previous term. */
bytePut64(&cursor, p->leader_commit); /* Commit index. */

uvEncodeBatchHeader(p->entries, p->n_entries, cursor);
uvEncodeBatchHeader(p->entries, p->n_entries, cursor, false /* no local data */);
}

static void encodeAppendEntriesResult(
Expand Down Expand Up @@ -295,14 +300,20 @@ int uvEncodeMessage(const struct raft_message *message,

void uvEncodeBatchHeader(const struct raft_entry *entries,
unsigned n,
void *buf)
void *buf,
bool with_local_data)
{
unsigned i;
void *cursor = buf;

/* Number of entries in the batch, little endian */
bytePut64(&cursor, n);

if (with_local_data) {
/* Local data size per entry, little endian */
bytePut64(&cursor, (uint64_t)sizeof(struct raft_entry_local_data));
}

for (i = 0; i < n; i++) {
const struct raft_entry *entry = &entries[i];

Expand Down Expand Up @@ -363,7 +374,8 @@ static void decodeRequestVoteResult(const uv_buf_t *buf,

int uvDecodeBatchHeader(const void *batch,
struct raft_entry **entries,
unsigned *n)
unsigned *n,
uint64_t *local_data_size)
{
const void *cursor = batch;
size_t i;
Expand All @@ -376,6 +388,15 @@ int uvDecodeBatchHeader(const void *batch,
return 0;
}

if (local_data_size != NULL) {
uint64_t z = byteGet64(&cursor);
if (z == 0 || z > sizeof(struct raft_entry_local_data) || z % sizeof(uint64_t) != 0) {
rv = RAFT_MALFORMED;
goto err;
}
*local_data_size = z;
}

*entries = raft_malloc(*n * sizeof **entries);

if (*entries == NULL) {
Expand Down Expand Up @@ -430,7 +451,7 @@ static int decodeAppendEntries(const uv_buf_t *buf,
args->prev_log_term = byteGet64(&cursor);
args->leader_commit = byteGet64(&cursor);

rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries);
rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries, false);
if (rv != 0) {
return rv;
}
Expand Down Expand Up @@ -549,33 +570,36 @@ int uvDecodeMessage(uint16_t type,
return rv;
}

void uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n)
int uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n,
uint64_t local_data_size)
{
uint8_t *cursor;
size_t i;

assert(batch != NULL);

cursor = batch + offset;

for (i = 0; i < n; i++) {
for (size_t i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
entry->batch = batch;
entry->buf.base = (entry->buf.len > 0) ? cursor : NULL;

if (entry->buf.len == 0) {
entry->buf.base = NULL;
continue;
}

entry->buf.base = cursor;

cursor = cursor + entry->buf.len;
cursor += entry->buf.len;
if (entry->buf.len % 8 != 0) {
/* Add padding */
cursor = cursor + 8 - (entry->buf.len % 8);
}

entry->is_local = false;

entry->local_data = (struct raft_entry_local_data){};
assert(local_data_size <= sizeof(entry->local_data.buf));
assert(local_data_size % 8 == 0);
memcpy(entry->local_data.buf, cursor, local_data_size);
cursor += local_data_size;
}
return 0;
}
Loading

0 comments on commit 44dbf91

Please sign in to comment.