Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local_data and is_local to raft_entry #639

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ static void leaderMaybeCheckpointLegacy(struct leader *l)
tracef("raft_malloc - no mem");
goto err_after_buf_alloc;
}
#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, apply, &buf, 1, leaderCheckpointApplyCb);
#else
rv = raft_apply(l->raft, apply, &buf, NULL, 1, leaderCheckpointApplyCb);
#endif
if (rv != 0) {
tracef("raft_apply failed %d", rv);
raft_free(apply);
Expand Down Expand Up @@ -332,7 +336,13 @@ static int leaderApplyFrames(struct exec *req,
apply->type = COMMAND_FRAMES;
idSet(apply->req.req_id, req->id);

#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb);
#else
/* TODO actual WAL slice goes here */
struct raft_entry_local_data local_data = {};
rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb);
#endif
if (rv != 0) {
tracef("raft apply failed %d", rv);
goto err_after_command_encode;
Expand Down
42 changes: 40 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,28 @@ enum {
RAFT_CHANGE /* Raft configuration change. */
};

/**
* A small fixed-size inline buffer that stores extra data for a raft_entry
* that is different for each node in the cluster.
*
* A leader initializes the local data for an entry before passing it into
* raft_apply. This local data is stored in the volatile raft log and also
* in the persistent raft log on the leader. AppendEntries messages sent by
* the leader never contain the local data for entries.
*
* When a follower accepts an AppendEntries request, it invokes a callback
* provided by the FSM to fill out the local data for each new entry before
* appending the entries to its log (volatile and persistent). This local
* data doesn't have to be the same as the local data that the leader computed.
*
* When starting up, a raft node reads the local data for each entry for its
* persistent log as part of populating the volatile log.
*/
struct raft_entry_local_data {
/* Must be the only member of this struct. */
uint8_t buf[16];
};

/**
* A single entry in the raft log.
*
Expand All @@ -220,12 +242,27 @@ enum {
*
* This arrangement makes it possible to minimize the amount of memory-copying
* when performing I/O.
*
* The @is_local field is set to `true` by a leader that appends an entry to its
* volatile log. It is set to `false` by a follower that copies an entry received
* via AppendEntries to its volatile log. It is not represented in the AppendEntries
* message or in the persistent log. This field can be used by the FSM's `apply`
* callback to handle a COMMAND entry differently depending on whether it
* originated locally.
*
* Note: The @local_data and @is_local fields do not exist when we use an external
* libraft, because the last separate release of libraft predates their addition.
* The ifdef at the very top of this file ensures that we use the system raft headers
* when we build against an external libraft, so there will be no ABI mismatch as
* a result of incompatible struct layouts.
*/
struct raft_entry
{
raft_term term; /* Term in which the entry was created. */
unsigned short type; /* Type (FSM command, barrier, config change). */
raft_term term; /* Term in which the entry was created. */
unsigned short type; /* Type (FSM command, barrier, config change). */
bool is_local; /* Placed here so it goes in the padding after @type. */
struct raft_buffer buf; /* Entry data. */
struct raft_entry_local_data local_data;
void *batch; /* Batch that buf's memory points to, if any. */
};

Expand Down Expand Up @@ -1082,6 +1119,7 @@ struct raft_apply
RAFT_API int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb);

Expand Down
5 changes: 3 additions & 2 deletions src/raft/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb)
{
Expand Down Expand Up @@ -41,7 +42,7 @@ int raft_apply(struct raft *r,
req->cb = cb;

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, n);
rv = logAppendCommands(r->log, r->current_term, bufs, local_data, n);
if (rv != 0) {
goto err;
}
Expand Down Expand Up @@ -90,7 +91,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
req->index = index;
req->cb = cb;

rv = logAppend(r->log, r->current_term, RAFT_BARRIER, &buf, NULL);
rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, (struct raft_entry_local_data){}, true, NULL);
if (rv != 0) {
goto err_after_buf_alloc;
}
Expand Down
3 changes: 2 additions & 1 deletion src/raft/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,8 @@ static void copyLeaderLog(struct raft_fixture *f)
buf.base = raft_malloc(buf.len);
assert(buf.base != NULL);
memcpy(buf.base, entry->buf.base, buf.len);
rv = logAppend(f->log, entry->term, entry->type, &buf, NULL);
/* FIXME(cole) what to do here for is_local? */
rv = logAppend(f->log, entry->term, entry->type, buf, (struct raft_entry_local_data){}, false, NULL);
assert(rv == 0);
}
logRelease(raft->log, 1, entries, n);
Expand Down
18 changes: 11 additions & 7 deletions src/raft/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,9 @@ int logReinstate(struct raft_log *l,
int logAppend(struct raft_log *l,
const raft_term term,
const unsigned short type,
const struct raft_buffer *buf,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch)
{
int rv;
Expand All @@ -556,7 +558,6 @@ int logAppend(struct raft_log *l,
assert(term > 0);
assert(type == RAFT_CHANGE || type == RAFT_BARRIER ||
type == RAFT_COMMAND);
assert(buf != NULL);

rv = ensureCapacity(l);
if (rv != 0) {
Expand All @@ -565,16 +566,18 @@ int logAppend(struct raft_log *l,

index = logLastIndex(l) + 1;

rv = refsInit(l, term, index, *buf, batch);
rv = refsInit(l, term, index, buf, batch);
if (rv != 0) {
return rv;
}

entry = &l->entries[l->back];
entry->term = term;
entry->type = type;
entry->buf = *buf;
entry->buf = buf;
entry->batch = batch;
entry->local_data = local_data;
entry->is_local = is_local;

l->back += 1;
l->back = l->back % l->size;
Expand All @@ -585,6 +588,7 @@ int logAppend(struct raft_log *l,
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n)
{
unsigned i;
Expand All @@ -596,8 +600,8 @@ int logAppendCommands(struct raft_log *l,
assert(n > 0);

for (i = 0; i < n; i++) {
const struct raft_buffer *buf = &bufs[i];
rv = logAppend(l, term, RAFT_COMMAND, buf, NULL);
struct raft_entry_local_data loc = (local_data != NULL) ? local_data[i] : (struct raft_entry_local_data){};
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], loc, true, NULL);
if (rv != 0) {
return rv;
}
Expand All @@ -624,7 +628,7 @@ int logAppendConfiguration(struct raft_log *l,
}

/* Append the new entry to the log. */
rv = logAppend(l, term, RAFT_CHANGE, &buf, NULL);
rv = logAppend(l, term, RAFT_CHANGE, buf, (struct raft_entry_local_data){}, true, NULL);
if (rv != 0) {
goto err_after_encode;
}
Expand Down
5 changes: 4 additions & 1 deletion src/raft/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ int logReinstate(struct raft_log *l,
int logAppend(struct raft_log *l,
raft_term term,
unsigned short type,
const struct raft_buffer *buf,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch);

/* Convenience to append a series of #RAFT_COMMAND entries. */
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n);

/* Convenience to encode and append a single #RAFT_CHANGE entry. */
Expand Down
2 changes: 1 addition & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ int replicationAppend(struct raft *r,
goto err_after_request_alloc;
}

rv = logAppend(r->log, copy.term, copy.type, &copy.buf, NULL);
rv = logAppend(r->log, copy.term, copy.type, copy.buf, (struct raft_entry_local_data){}, false, NULL);
if (rv != 0) {
goto err_after_request_alloc;
}
Expand Down
4 changes: 2 additions & 2 deletions src/raft/start.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ static int restoreEntries(struct raft *r,
r->last_stored = start_index - 1;
for (i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
rv = logAppend(r->log, entry->term, entry->type, &entry->buf,
entry->batch);
rv = logAppend(r->log, entry->term, entry->type, entry->buf,
entry->local_data, entry->is_local, entry->batch);
if (rv != 0) {
goto err;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ static size_t uvAppendSize(struct uvAppend *a)
{
size_t size = sizeof(uint32_t) * 2; /* CRC checksums */
unsigned i;
size += uvSizeofBatchHeader(a->n); /* Batch header */
size += uvSizeofBatchHeader(a->n, true); /* Batch header */
for (i = 0; i < a->n; i++) { /* Entries data */
size += bytePad64(a->entries[i].buf.len);
}
Expand Down
73 changes: 52 additions & 21 deletions src/raft/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@
sizeof(uint64_t) /* Last log term. */;
}

size_t uvSizeofBatchHeader(size_t n)
size_t uvSizeofBatchHeader(size_t n, bool with_local_data)
{
return 8 + /* Number of entries in the batch, little endian */
16 * n /* One header per entry */;
size_t res = 8 + /* Number of entries in the batch, little endian */
16 * n; /* One header per entry */;
if (with_local_data) {
#ifdef DQLITE_NEXT
res += 8; /* Local data length, applies to all entries */
#endif
}
return res;
}

static void encodeRequestVote(const struct raft_request_vote *p, void *buf)
Expand Down Expand Up @@ -137,7 +143,7 @@
bytePut64(&cursor, p->prev_log_term); /* Previous term. */
bytePut64(&cursor, p->leader_commit); /* Commit index. */

uvEncodeBatchHeader(p->entries, p->n_entries, cursor);
uvEncodeBatchHeader(p->entries, p->n_entries, cursor, false /* no local data */);
}

static void encodeAppendEntriesResult(
Expand Down Expand Up @@ -295,14 +301,22 @@

void uvEncodeBatchHeader(const struct raft_entry *entries,
unsigned n,
void *buf)
void *buf,
bool with_local_data)
{
unsigned i;
void *cursor = buf;

/* Number of entries in the batch, little endian */
bytePut64(&cursor, n);

if (with_local_data) {
#ifdef DQLITE_NEXT
/* Local data size per entry, little endian */
bytePut64(&cursor, (uint64_t)sizeof(struct raft_entry_local_data));
#endif
}

for (i = 0; i < n; i++) {
const struct raft_entry *entry = &entries[i];

Expand Down Expand Up @@ -363,7 +377,8 @@

int uvDecodeBatchHeader(const void *batch,
struct raft_entry **entries,
unsigned *n)
unsigned *n,
uint64_t *local_data_size)
{
const void *cursor = batch;
size_t i;
Expand All @@ -376,6 +391,17 @@
return 0;
}

if (local_data_size != NULL) {
#ifdef DQLITE_NEXT
uint64_t z = byteGet64(&cursor);
if (z == 0 || z > sizeof(struct raft_entry_local_data) || z % sizeof(uint64_t) != 0) {
rv = RAFT_MALFORMED;
goto err;

Check warning on line 399 in src/raft/uv_encoding.c

View check run for this annotation

Codecov / codecov/patch

src/raft/uv_encoding.c#L398-L399

Added lines #L398 - L399 were not covered by tests
}
*local_data_size = z;
#endif
}

*entries = raft_malloc(*n * sizeof **entries);

if (*entries == NULL) {
Expand Down Expand Up @@ -430,7 +456,7 @@
args->prev_log_term = byteGet64(&cursor);
args->leader_commit = byteGet64(&cursor);

rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries);
rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries, false);
if (rv != 0) {
return rv;
}
Expand Down Expand Up @@ -549,33 +575,38 @@
return rv;
}

void uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n)
int uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n,
uint64_t local_data_size)
{
uint8_t *cursor;
size_t i;

assert(batch != NULL);

cursor = batch + offset;

for (i = 0; i < n; i++) {
for (size_t i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
entry->batch = batch;
entry->buf.base = (entry->buf.len > 0) ? cursor : NULL;

if (entry->buf.len == 0) {
entry->buf.base = NULL;
continue;
}

entry->buf.base = cursor;

cursor = cursor + entry->buf.len;
cursor += entry->buf.len;
if (entry->buf.len % 8 != 0) {
/* Add padding */
cursor = cursor + 8 - (entry->buf.len % 8);
}

entry->is_local = false;

entry->local_data = (struct raft_entry_local_data){};
assert(local_data_size <= sizeof(entry->local_data.buf));
assert(local_data_size % 8 == 0);
#ifdef DQLITE_NEXT
memcpy(entry->local_data.buf, cursor, local_data_size);
cursor += local_data_size;
#endif
}
return 0;
}
Loading
Loading