Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log compaction #42

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static: $(OBJECTS)
ar -r libraft.a $(OBJECTS)

.PHONY: tests
tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c
tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_snapshotting.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c
$(CC) $(CFLAGS) -o tests_main $^
./tests_main
gcov raft_server.c
Expand Down
28 changes: 24 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ See `raft.h <https://github.com/willemt/raft/blob/master/include/raft.h>`_ for f

See `ticketd <https://github.com/willemt/ticketd>`_ for real life use of this library.

Networking is out of scope for this project. The implementor will need to do all the plumbing.
Networking is out of scope for this project. The implementor will need to do all the plumbing. The library doesn't assume a network layer with ordering or duplicate detection. This means you could use UDP for transmission.

There are no dependencies, however https://github.com/willemt/linked-List-queue is required for testing.

Expand Down Expand Up @@ -428,7 +428,27 @@ It's highly recommended that when a node is added to the cluster that its node I

3. Once the ``RAFT_LOGTYPE_REMOVE_NODE`` configuration change log is applied in the ``applylog`` callback we shutdown the server if it is to be removed.

Todo
====
Log Compaction
--------------
The log compaction method supported is called "Snapshotting for memory-based state machines" (Ongaro, 2014)

- Log compaction
This library does not send snapshots (ie. there are NO send_snapshot, recv_snapshot callbacks to implement). The user has to send the snapshot outside of this library. The implementor has to serialize and deserialize the snapshot.

The process works like this:

1. Begin snapshotting with ``raft_begin_snapshot``.
2. Save the current membership details to the snapshot.
3. Save the finite state machine to the snapshot.
4. End snapshotting with ``raft_end_snapshot``.
5. When the ``send_snapshot`` callback fires, the user must propogate the snapshot to the other node.
6. Once the peer has the snapshot, they call ``raft_begin_load_snapshot``.
7. Call ``raft_add_node`` to add nodes as per the snapshot's membership info.
8. Call ``raft_node_set_voting`` to nodes as per the snapshot's membership info.
9. Call ``raft_node_set_active`` to nodes as per the snapshot's membership info.
10. Finally, call ``raft_node_set_active`` to nodes as per the snapshot's membership info.

When a node receives a snapshot it could reuse that snapshot itself for other nodes.

References
==========
Ongaro, D. (2014). Consensus: bridging theory and practice. Retrieved from https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf
195 changes: 189 additions & 6 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3
#define RAFT_ERR_SHUTDOWN -4
#define RAFT_ERR_NOMEM -5
#define RAFT_ERR_NEEDS_SNAPSHOT -6
#define RAFT_ERR_SNAPSHOT_IN_PROGRESS -7
#define RAFT_ERR_SNAPSHOT_ALREADY_LOADED -8
#define RAFT_ERR_LAST -100

#define RAFT_REQUESTVOTE_ERR_GRANTED 1
Expand All @@ -28,12 +31,43 @@ typedef enum {
} raft_state_e;

typedef enum {
/**
* Regular log type.
* This is solely for application data intended for the FSM.
*/
RAFT_LOGTYPE_NORMAL,
/**
* Membership change.
* Non-voting nodes can't cast votes or start elections.
* Nodes in this non-voting state are used to catch up with the cluster,
* when trying to the join the cluster.
*/
RAFT_LOGTYPE_ADD_NONVOTING_NODE,
/**
* Membership change.
* Add a voting node.
*/
RAFT_LOGTYPE_ADD_NODE,
/**
* Membership change.
* Nodes become demoted when we want to remove them from the cluster.
* Demoted nodes can't take part in voting or start elections.
* Demoted nodes become inactive, as per raft_node_is_active.
*/
RAFT_LOGTYPE_DEMOTE_NODE,
/**
* Membership change.
* The node is removed from the cluster.
* This happens after the node has been demoted.
* Removing nodes is a 2 step process: first demote, then remove.
*/
RAFT_LOGTYPE_REMOVE_NODE,
RAFT_LOGTYPE_NUM,
RAFT_LOGTYPE_SNAPSHOT,
/**
* Users can piggyback the entry mechanism by specifying log types that
* are higher than RAFT_LOGTYPE_NUM.
*/
RAFT_LOGTYPE_NUM=100,
} raft_logtype_e;

typedef struct
Expand Down Expand Up @@ -190,6 +224,22 @@ typedef int (
msg_appendentries_t* msg
);

/**
* Log compaction
* Callback for telling the user to send a snapshot.
*
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node Node's ID that needs a snapshot sent to
**/
typedef int (
*func_send_snapshot_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node
);

/** Callback for detecting when non-voting nodes have obtained enough logs.
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
Expand Down Expand Up @@ -236,6 +286,23 @@ typedef int (
int vote
);

/** Callback for saving the snapshot we received.
* For safety reasons this callback MUST flush the change to disk.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] idx The log entry index we want to persist
* @param[in] term The log entry term we want to persist
* @return 0 on success */
typedef int (
*func_persist_snapshot_metadata_f
) (
raft_server_t* raft,
void *user_data,
int idx,
int term
);


/** Callback for saving current term (and nil vote) to disk.
* For safety reasons this callback MUST flush the term and vote changes to
* disk atomically.
Expand Down Expand Up @@ -290,6 +357,9 @@ typedef struct
/** Callback for sending appendentries messages */
func_send_appendentries_f send_appendentries;

/** Callback for notifying user that a node needs a snapshot sent */
func_send_snapshot_f send_snapshot;

/** Callback for finite state machine application
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
Expand All @@ -304,6 +374,9 @@ typedef struct
* disk atomically. */
func_persist_term_f persist_term;

/** Callback for persisting snapshot metadata */
func_persist_snapshot_metadata_f persist_snapshot_metadata;

/** Callback for adding an entry to the log
* For safety reasons this callback MUST flush the change to disk.
* Return 0 on success.
Expand Down Expand Up @@ -415,7 +488,7 @@ void raft_set_request_timeout(raft_server_t* me, int msec);
* @return
* 0 on success;
* -1 on failure;
* RAFT_ERR_SHUTDOWN when server should be shutdown */
* RAFT_ERR_SHUTDOWN when server should shutdown */
int raft_periodic(raft_server_t* me, int msec_elapsed);

/** Receive an appendentries message.
Expand All @@ -434,7 +507,10 @@ int raft_periodic(raft_server_t* me, int msec_elapsed);
* @param[in] node The node who sent us this message
* @param[in] ae The appendentries message
* @param[out] r The resulting response
* @return 0 on success */
* @return
* 0 on success
* RAFT_ERR_NEEDS_SNAPSHOT
* */
int raft_recv_appendentries(raft_server_t* me,
raft_node_t* node,
msg_appendentries_t* ae,
Expand Down Expand Up @@ -466,7 +542,7 @@ int raft_recv_requestvote(raft_server_t* me,
* @param[in] r The requestvote response message
* @return
* 0 on success;
* RAFT_ERR_SHUTDOWN server should be shutdown; */
* RAFT_ERR_SHUTDOWN server MUST shutdown; */
int raft_recv_requestvote_response(raft_server_t* me,
raft_node_t* node,
msg_requestvote_response_t* r);
Expand Down Expand Up @@ -497,7 +573,7 @@ int raft_recv_requestvote_response(raft_server_t* me,
* @return
* 0 on success;
* RAFT_ERR_NOT_LEADER server is not the leader;
* RAFT_ERR_SHUTDOWN server should be shutdown;
* RAFT_ERR_SHUTDOWN server MUST shutdown;
* RAFT_ERR_ONE_VOTING_CHANGE_ONLY there is a non-voting change inflight;
* RAFT_ERR_NOMEM memory allocation failure
*/
Expand Down Expand Up @@ -680,10 +756,14 @@ void raft_node_set_voting(raft_node_t* node, int voting);
* @return 1 if this is a voting node. Otherwise 0. */
int raft_node_is_voting(raft_node_t* me_);

/** Check if a node has sufficient logs to be able to join the cluster.
**/
int raft_node_has_sufficient_logs(raft_node_t* me_);

/** Apply all entries up to the commit index
* @return
* 0 on success;
* RAFT_ERR_SHUTDOWN when server should be shutdown */
* RAFT_ERR_SHUTDOWN when server MUST shutdown */
int raft_apply_all(raft_server_t* me_);

/** Become leader
Expand All @@ -705,4 +785,107 @@ int raft_entry_is_voting_cfg_change(raft_entry_t* ety);
* @return 1 if this is a configuration change. */
int raft_entry_is_cfg_change(raft_entry_t* ety);

raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);

/** Begin snapshotting.
*
* While snapshotting, raft will:
* - not apply log entries
* - not start elections
*
* @return 0 on success
*
**/
int raft_begin_snapshot(raft_server_t *me_);

/** Stop snapshotting.
*
* The user MUST include membership changes inside the snapshot. This means
* that membership changes are included in the size of the snapshot. For peers
* that load the snapshot, the user needs to deserialize the snapshot to
* obtain the membership changes.
*
* The user MUST compact the log up to the commit index. This means all
* log entries up to the commit index MUST be deleted (aka polled).
*
* @return
* 0 on success
* -1 on failure
**/
int raft_end_snapshot(raft_server_t *me_);

/** Get the entry index of the entry that was snapshotted
**/
int raft_get_snapshot_entry_idx(raft_server_t *me_);

/** Check is a snapshot is in progress
**/
int raft_snapshot_is_in_progress(raft_server_t *me_);

/** Remove the first log entry.
* This should be used for compacting logs.
* @return 0 on success
**/
int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety);

/** Get last applied entry
**/
raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);

int raft_get_first_entry_idx(raft_server_t* me_);

/** Start loading snapshot
*
* This is usually the result of a snapshot being loaded.
* We need to send an appendentries response.
*
* @param[in] last_included_term Term of the last log of the snapshot
* @param[in] last_included_index Index of the last log of the snapshot
*
* @return
* 0 on success
* -1 on failure
* RAFT_ERR_SNAPSHOT_ALREADY_LOADED
**/
int raft_begin_load_snapshot(raft_server_t *me_,
int last_included_term,
int last_included_index);

/** Stop loading snapshot.
*
* @return
* 0 on success
* -1 on failure
**/
int raft_end_load_snapshot(raft_server_t *me_);

int raft_get_snapshot_last_idx(raft_server_t *me_);

int raft_get_snapshot_last_term(raft_server_t *me_);

/** Check if a node is active.
* Active nodes could become voting nodes.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_active(raft_node_t* me_);

/** Make the node active.
*
* The user sets this to 1 between raft_begin_load_snapshot and
* raft_end_load_snapshot.
*
* @param[in] active Set a node as active if this is 1
**/
void raft_node_set_active(raft_node_t* me_, int active);

/** Check if a node's voting status has been committed.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_voting_committed(raft_node_t* me_);

/** Check if a node's membership to the cluster has been committed.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_addition_committed(raft_node_t* me_);

#endif /* RAFT_H_ */
7 changes: 7 additions & 0 deletions include/raft_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ void log_empty(log_t * me_);
* Remove oldest entry. Set *etyp to oldest entry on success. */
int log_poll(log_t * me_, void** etyp);

/** Get an array of entries from this index onwards.
* This is used for batching.
*/
raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys);

raft_entry_t* log_get_at_idx(log_t* me_, int idx);
Expand All @@ -46,4 +49,8 @@ raft_entry_t *log_peektail(log_t * me_);

int log_get_current_idx(log_t* me_);

int log_load_from_snapshot(log_t *me_, int idx, int term);

int log_get_base(log_t* me_);

#endif /* RAFT_LOG_H_ */
Loading