-
Notifications
You must be signed in to change notification settings - Fork 556
/
node.rs
2088 lines (1866 loc) · 86.4 KB
/
node.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use super::log::{Index, Log};
use super::message::{Envelope, Message, ReadSequence, Request, RequestID, Response, Status};
use super::state::State;
use crate::errinput;
use crate::error::{Error, Result};
use crossbeam::channel::Sender;
use itertools::Itertools as _;
use log::{debug, info};
use rand::Rng as _;
use std::collections::{HashMap, HashSet, VecDeque};
/// A node ID. Unique within a cluster. Assigned manually when started.
pub type NodeID = u8;
/// A leader term number. Increases monotonically.
pub type Term = u64;
/// A logical clock interval as number of ticks.
pub type Ticks = u8;
/// Raft node options.
#[derive(Clone, Debug, PartialEq)]
pub struct Options {
/// The number of ticks between leader heartbeats.
pub heartbeat_interval: Ticks,
/// The range of randomized election timeouts for followers and candidates.
pub election_timeout_range: std::ops::Range<Ticks>,
/// Maximum number of entries to send in a single Append message.
pub max_append_entries: usize,
}
impl Default for Options {
fn default() -> Self {
Self {
heartbeat_interval: super::HEARTBEAT_INTERVAL,
election_timeout_range: super::ELECTION_TIMEOUT_RANGE,
max_append_entries: super::MAX_APPEND_ENTRIES,
}
}
}
/// A Raft node with a dynamic role. This implements the Raft distributed
/// consensus protocol, see the `raft` module documentation for more info.
///
/// The node is driven synchronously by processing inbound messages via `step()`
/// and by advancing time via `tick()`. These methods consume the node and
/// return a new one with a possibly different role. Outbound messages are sent
/// via the given `tx` channel, and must be delivered to peers or clients.
///
/// This enum is the public interface to the node, with a closed set of roles.
/// It wraps the `RawNode<Role>` types, which implement the actual node logic.
/// The enum allows ergonomic use across role transitions since it can represent
/// all roles, e.g.: `node = node.step()?`.
pub enum Node {
/// A candidate campaigns for leadership.
Candidate(RawNode<Candidate>),
/// A follower replicates entries from a leader.
Follower(RawNode<Follower>),
/// A leader processes client requests and replicates entries to followers.
Leader(RawNode<Leader>),
}
/// Helper macro which calls a closure on the inner RawNode<Role>.
macro_rules! with_rawnode {
// Node is moved.
($node:expr, $closure:expr) => {{
fn with<R: Role, T>(node: RawNode<R>, f: impl FnOnce(RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(node) => with(node, $closure),
Node::Follower(node) => with(node, $closure),
Node::Leader(node) => with(node, $closure),
}
}};
// Node is borrowed (ref).
(ref $node:expr, $closure:expr) => {{
fn with<R: Role, T>(node: &RawNode<R>, f: impl FnOnce(&RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(ref node) => with(node, $closure),
Node::Follower(ref node) => with(node, $closure),
Node::Leader(ref node) => with(node, $closure),
}
}};
// Node is mutably borrowed (ref mut).
(ref mut $node:expr, $closure:expr) => {{
fn with<R: Role, T>(node: &mut RawNode<R>, f: impl FnOnce(&mut RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(ref mut node) => with(node, $closure),
Node::Follower(ref mut node) => with(node, $closure),
Node::Leader(ref mut node) => with(node, $closure),
}
}};
}
impl Node {
/// Creates a new Raft node. It starts as a leaderless follower, waiting to
/// hear from a leader or otherwise transitioning to candidate and
/// campaigning for leadership. In the case of a single-node cluster (no
/// peers), the node immediately transitions to leader when created.
pub fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let node = RawNode::new(id, peers, log, state, tx, opts)?;
// If this is a single-node cluster, become leader immediately.
if node.cluster_size() == 1 {
return Ok(node.into_candidate()?.into_leader()?.into());
}
Ok(node.into())
}
/// Returns the node ID.
pub fn id(&self) -> NodeID {
with_rawnode!(ref self, |n| n.id)
}
/// Returns the node term.
pub fn term(&self) -> Term {
with_rawnode!(ref self, |n| n.term())
}
/// Processes an inbound message.
pub fn step(self, msg: Envelope) -> Result<Self> {
with_rawnode!(self, |n| {
assert_eq!(msg.to, n.id, "message to other node: {msg:?}");
assert!(n.peers.contains(&msg.from) || msg.from == n.id, "unknown sender: {msg:?}");
debug!("Stepping {msg:?}");
n.step(msg)
})
}
/// Advances time by a tick.
pub fn tick(self) -> Result<Self> {
with_rawnode!(self, |n| n.tick())
}
}
impl From<RawNode<Candidate>> for Node {
fn from(n: RawNode<Candidate>) -> Self {
Node::Candidate(n)
}
}
impl From<RawNode<Follower>> for Node {
fn from(n: RawNode<Follower>) -> Self {
Node::Follower(n)
}
}
impl From<RawNode<Leader>> for Node {
fn from(n: RawNode<Leader>) -> Self {
Node::Leader(n)
}
}
/// Marker trait for a Raft role: leader, follower, or candidate.
pub trait Role {}
/// A Raft node with role R.
///
/// This implements the typestate pattern, where individual node states (roles)
/// are encoded as RawNode<Role>. See: http:https://cliffle.com/blog/rust-typestate/
pub struct RawNode<R: Role> {
/// The node ID. Must be unique in this cluster.
id: NodeID,
/// The IDs of the other nodes in the cluster. Does not change while
/// running. Can change on restart, but all nodes must have the same node
/// set to avoid multiple leaders (i.e. split brain).
peers: HashSet<NodeID>,
/// The Raft log, containing client commands to be executed.
log: Log,
/// The Raft state machine, on which client commands are executed.
state: Box<dyn State>,
/// Channel for sending outbound messages to other nodes.
tx: Sender<Envelope>,
/// Node options.
opts: Options,
/// Role-specific state.
role: R,
}
impl<R: Role> RawNode<R> {
/// Helper for role transitions.
fn into_role<T: Role>(self, role: T) -> RawNode<T> {
RawNode {
id: self.id,
peers: self.peers,
log: self.log,
state: self.state,
tx: self.tx,
opts: self.opts,
role,
}
}
/// Returns the node's current term. Convenience wrapper for Log.get_term().
fn term(&self) -> Term {
self.log.get_term().0
}
/// Returns the cluster size as number of nodes.
fn cluster_size(&self) -> usize {
self.peers.len() + 1
}
/// Returns the cluster quorum size (strict majority).
fn quorum_size(&self) -> usize {
self.cluster_size() / 2 + 1
}
/// Returns the quorum value of the given unsorted vector, in descending
/// order. The slice must have the same size as the cluster.
fn quorum_value<T: Ord + Copy>(&self, mut values: Vec<T>) -> T {
assert_eq!(values.len(), self.cluster_size(), "vector size must match cluster size");
*values.select_nth_unstable_by(self.quorum_size() - 1, |a, b| a.cmp(b).reverse()).1
}
/// Generates a random election timeout.
fn random_election_timeout(&self) -> Ticks {
rand::thread_rng().gen_range(self.opts.election_timeout_range.clone())
}
/// Sends a message to the given recipient.
fn send(&self, to: NodeID, message: Message) -> Result<()> {
Self::send_with(&self.tx, Envelope { from: self.id, to, term: self.term(), message })
}
/// Sends a message without borrowing self, to allow partial borrows.
fn send_with(tx: &Sender<Envelope>, msg: Envelope) -> Result<()> {
debug!("Sending {msg:?}");
Ok(tx.send(msg)?)
}
/// Broadcasts a message to all peers.
fn broadcast(&self, message: Message) -> Result<()> {
// Send in increasing ID order for test determinism.
for id in self.peers.iter().copied().sorted() {
self.send(id, message.clone())?;
}
Ok(())
}
}
// A follower replicates log entries from a leader and forwards client requests.
// Nodes start as leaderless followers, until they either discover a leader or
// hold an election.
pub struct Follower {
/// The leader, or None if we're a leaderless follower.
leader: Option<NodeID>,
/// The number of ticks since the last message from the leader.
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
// Local client requests that have been forwarded to the leader. These are
// aborted on leader/term changes.
forwarded: HashSet<RequestID>,
}
impl Follower {
/// Creates a new follower role.
fn new(leader: Option<NodeID>, election_timeout: Ticks) -> Self {
Self { leader, leader_seen: 0, election_timeout, forwarded: HashSet::new() }
}
}
impl Role for Follower {}
impl RawNode<Follower> {
/// Creates a new node as a leaderless follower.
fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
if peers.contains(&id) {
return errinput!("node ID {id} can't be in peers");
}
let role = Follower::new(None, 0);
let mut node = Self { id, peers, log, state, tx, opts, role };
node.role.election_timeout = node.random_election_timeout();
// Apply any pending entries following restart. Unlike the Raft log,
// state machine writes are not flushed to durable storage, so a tail of
// writes may be lost if the OS crashes or restarts.
node.maybe_apply()?;
Ok(node)
}
/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;
// Apply any pending log entries, so that we're caught up if we win.
self.maybe_apply()?;
// Become candidate and campaign.
let election_timeout = self.random_election_timeout();
let mut node = self.into_role(Candidate::new(election_timeout));
node.campaign()?;
let (term, vote) = node.log.get_term();
assert!(node.role.votes.contains(&node.id), "candidate did not vote for self");
assert_ne!(term, 0, "candidate can't have term 0");
assert_eq!(vote, Some(node.id), "log vote does not match self");
Ok(node)
}
/// Transitions the follower into either a leaderless follower in a new term
/// (e.g. if someone holds a new election) or a follower of a current leader.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
assert_ne!(term, 0, "can't become follower in term 0");
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;
if let Some(leader) = leader {
// We found a leader in the current term.
assert!(self.peers.contains(&leader), "leader is not a peer");
assert_eq!(self.role.leader, None, "already have leader in term");
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Following leader {leader} in term {term}");
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't know who the leader is yet.
// We'll find out if we step a message from it.
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
self.role = Follower::new(None, self.random_election_timeout());
}
Ok(self)
}
/// Processes an inbound message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Past term: drop the message.
if msg.term < self.term() {
debug!("Dropping message from past term: {msg:?}");
return Ok(self.into());
}
// Future term: become leaderless follower and step the message.
if msg.term > self.term() {
return self.into_follower(msg.term, None)?.step(msg);
}
// Record when we last saw a message from the leader (if any).
if Some(msg.from) == self.role.leader {
self.role.leader_seen = 0
}
match msg.message {
// The leader sends periodic heartbeats. If we don't have a leader
// yet, follow it. If the commit_index advances, apply commands.
Message::Heartbeat { last_index, commit_index, read_seq } => {
assert!(commit_index <= last_index, "commit_index after last_index");
// Make sure the heartbeat is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// Attempt to match the leader's log and respond to the
// heartbeat. last_index always has the leader's term.
let match_index = if self.log.has(last_index, msg.term)? { last_index } else { 0 };
self.send(msg.from, Message::HeartbeatResponse { match_index, read_seq })?;
// Advance the commit index and apply entries. We can only do
// this if we matched the leader's last_index, which implies
// that the logs are identical up to match_index. This also
// implies that the commit_index is present in our log.
if match_index != 0 && commit_index > self.log.get_commit_index().0 {
self.log.commit(commit_index)?;
self.maybe_apply()?;
}
}
// Append log entries from the leader to the local log.
Message::Append { base_index, base_term, entries } => {
if let Some(first) = entries.first() {
assert_eq!(base_index, first.index - 1, "base index mismatch");
}
// Make sure the append is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// If the base entry matches our log, append the entries.
let (mut reject_index, mut match_index) = (0, 0);
if base_index == 0 || self.log.has(base_index, base_term)? {
match_index = entries.last().map(|e| e.index).unwrap_or(base_index);
self.log.splice(entries)?;
} else {
// Otherwise, reject the base index. If the local log is
// shorter than the base index, lower the reject index to
// skip all missing entries.
reject_index = std::cmp::min(base_index, self.log.get_last_index().0 + 1);
}
self.send(msg.from, Message::AppendResponse { reject_index, match_index })?;
}
// A candidate is requesting our vote. We'll only grant one.
Message::Campaign { last_index, last_term } => {
// Don't vote if we already voted for someone else in this term.
// We can repeat our vote though.
if let (_, Some(vote)) = self.log.get_term() {
if msg.from != vote {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
}
// Don't vote if our log is newer than the candidate's log.
// This ensures that an elected leader has all committed
// entries, see section 5.4.1 in the Raft paper.
let (log_index, log_term) = self.log.get_last_index();
if log_term > last_term || log_term == last_term && log_index > last_index {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
// Grant the vote.
info!("Voting for {} in term {} election", msg.from, msg.term);
self.log.set_term(msg.term, Some(msg.from))?;
self.send(msg.from, Message::CampaignResponse { vote: true })?;
}
// Forward client requests to the leader, or abort them if there is
// none. These will not be retried, the client should use timeouts.
// Local client requests use our node ID as the sender.
Message::ClientRequest { id, request: _ } => {
assert_eq!(msg.from, self.id, "client request from other node");
if let Some(leader) = self.role.leader {
debug!("Forwarding request to leader {leader}: {msg:?}");
self.role.forwarded.insert(id);
self.send(leader, msg.message)?
} else {
let response = Err(Error::Abort);
self.send(msg.from, Message::ClientResponse { id, response })?
}
}
// Client responses from the leader are passed on to the client.
Message::ClientResponse { id, response } => {
assert_eq!(Some(msg.from), self.role.leader, "client response from non-leader");
if self.role.forwarded.remove(&id) {
self.send(self.id, Message::ClientResponse { id, response })?;
}
}
// We may receive a vote after we lost an election, ignore it.
Message::CampaignResponse { .. } => {}
// We're not leader this term, so we shouldn't see these.
Message::HeartbeatResponse { .. } | Message::AppendResponse { .. } => {
panic!("unexpected message {msg:?}")
}
};
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
}
Ok(self.into())
}
/// Aborts all forwarded requests (e.g. on term/leader changes).
fn abort_forwarded(&mut self) -> Result<()> {
// Sort by ID for test determinism.
for id in std::mem::take(&mut self.role.forwarded).into_iter().sorted() {
debug!("Aborting forwarded request {id}");
self.send(self.id, Message::ClientResponse { id, response: Err(Error::Abort) })?;
}
Ok(())
}
/// Applies any pending log entries.
fn maybe_apply(&mut self) -> Result<()> {
let mut iter = self.log.scan_apply(self.state.get_applied_index());
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
// Throw away the result, since only the leader responds to clients.
// This includes errors -- any non-deterministic errors (e.g. IO
// errors) must panic instead to avoid replica divergence.
_ = self.state.apply(entry);
}
Ok(())
}
}
/// A candidate is campaigning to become a leader.
pub struct Candidate {
/// Votes received (including our own).
votes: HashSet<NodeID>,
/// Ticks elapsed since election start.
election_duration: Ticks,
/// Election timeout, in ticks.
election_timeout: Ticks,
}
impl Candidate {
/// Creates a new candidate role.
fn new(election_timeout: Ticks) -> Self {
Self { votes: HashSet::new(), election_duration: 0, election_timeout }
}
}
impl Role for Candidate {}
impl RawNode<Candidate> {
/// Transitions the candidate to a follower. We either lost the election and
/// follow the winner, or we discovered a new term and step into it as a
/// leaderless follower.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
let election_timeout = self.random_election_timeout();
if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Lost election, following leader {leader} in term {term}");
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}
/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
let (term, vote) = self.log.get_term();
assert_ne!(term, 0, "leaders can't have term 0");
assert_eq!(vote, Some(self.id), "leader did not vote for self");
info!("Won election for term {term}, becoming leader");
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
// Propose an empty command when assuming leadership, to disambiguate
// previous entries in the log. See section 5.4.2 in the Raft paper.
// We do this prior to the heartbeat, to avoid a wasted replication
// roundtrip if the heartbeat response indicates the peer is behind.
node.propose(None)?;
node.maybe_commit_and_apply()?;
node.heartbeat()?;
Ok(node)
}
/// Processes an inbound message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Past term: drop the message.
if msg.term < self.term() {
debug!("Dropping message from past term: {msg:?}");
return Ok(self.into());
}
// Future term: become leaderless follower and step the message.
if msg.term > self.term() {
return self.into_follower(msg.term, None)?.step(msg);
}
match msg.message {
// If we received a vote, record it. If the vote gives us quorum,
// assume leadership.
Message::CampaignResponse { vote: true } => {
self.role.votes.insert(msg.from);
if self.role.votes.len() >= self.quorum_size() {
return Ok(self.into_leader()?.into());
}
}
// We didn't get the vote. :(
Message::CampaignResponse { vote: false } => {}
// Don't grant votes for other candidates.
Message::Campaign { .. } => {
self.send(msg.from, Message::CampaignResponse { vote: false })?
}
// If we hear from a leader in this term, we lost the election.
// Follow it and step the message.
Message::Heartbeat { .. } | Message::Append { .. } => {
return self.into_follower(msg.term, Some(msg.from))?.step(msg);
}
// Abort client requests while campaigning. The client must retry.
Message::ClientRequest { id, request: _ } => {
self.send(msg.from, Message::ClientResponse { id, response: Err(Error::Abort) })?;
}
// We're not a leader in this term, nor are we forwarding requests,
// so we shouldn't see these.
Message::HeartbeatResponse { .. }
| Message::AppendResponse { .. }
| Message::ClientResponse { .. } => panic!("unexpected message {msg:?}"),
}
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
self.campaign()?;
}
Ok(self.into())
}
/// Hold a new election by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
fn campaign(&mut self) -> Result<()> {
let term = self.term() + 1;
info!("Starting new election for term {term}");
self.role = Candidate::new(self.random_election_timeout());
self.role.votes.insert(self.id); // vote for ourself
self.log.set_term(term, Some(self.id))?;
let (last_index, last_term) = self.log.get_last_index();
self.broadcast(Message::Campaign { last_index, last_term })
}
}
// A leader serves client requests and replicates the log to followers.
// If the leader loses leadership, all client requests are aborted.
pub struct Leader {
/// Follower replication progress.
progress: HashMap<NodeID, Progress>,
/// Tracks pending write requests by log index. Added when the write is
/// proposed and appended to the leader's log, and removed when the command
/// is applied to the state machine, returning the result to the client.
writes: HashMap<Index, Write>,
/// Tracks pending read requests. For linearizability, read requests are
/// assigned a sequence number and only executed once a quorum of nodes have
/// confirmed it via leader heartbeats. Otherwise, an old leader may serve
/// stale reads if a new leader has been elected elsewhere.
reads: VecDeque<Read>,
/// The read sequence number used for the last read. Initialized to 0 in
/// this term, and incremented for every read command.
read_seq: ReadSequence,
/// Number of ticks since last heartbeat.
since_heartbeat: Ticks,
}
/// Follower replication progress (in this term).
struct Progress {
/// The highest index where the follower's log is known to match the leader.
/// Initialized to 0, increases monotonically.
match_index: Index,
/// The next index to replicate to the follower. Initialized to
/// last_index+1, decreased when probing log mismatches. Always in
/// the range [match_index+1, last_index+1].
///
/// Entries pending transmission are in the range [next_index, last_index].
/// Unacknowledged entries are in the range [match_index+1, next_index).
next_index: Index,
/// The last read sequence number confirmed by the peer. To avoid stale
/// reads on leader changes, a read is only served once its sequence number
/// is confirmed by a quorum.
read_seq: ReadSequence,
}
impl Progress {
/// Attempts to advance a follower's match index, returning true if it did.
/// If next_index is below it, it is advanced to the following index.
fn advance(&mut self, match_index: Index) -> bool {
if match_index <= self.match_index {
return false;
}
self.match_index = match_index;
self.next_index = std::cmp::max(self.next_index, match_index + 1);
true
}
/// Attempts to advance a follower's read_seq, returning true if it did.
fn advance_read(&mut self, read_seq: ReadSequence) -> bool {
if read_seq <= self.read_seq {
return false;
}
self.read_seq = read_seq;
true
}
/// Attempts to regress a follower's next index to the given index, returning
/// true if it did. Won't regress below match_index + 1.
fn regress_next(&mut self, next_index: Index) -> bool {
if next_index >= self.next_index || self.next_index <= self.match_index + 1 {
return false;
}
self.next_index = std::cmp::max(next_index, self.match_index + 1);
true
}
}
/// A pending client write request.
struct Write {
/// The node which submitted the write.
from: NodeID,
/// The write request ID.
id: RequestID,
}
/// A pending client read request.
struct Read {
/// The sequence number of this read.
seq: ReadSequence,
/// The node which submitted the read.
from: NodeID,
/// The read request ID.
id: RequestID,
/// The read command.
command: Vec<u8>,
}
impl Leader {
/// Creates a new leader role.
fn new(peers: HashSet<NodeID>, last_index: Index) -> Self {
let next_index = last_index + 1;
let progress = peers
.into_iter()
.map(|p| (p, Progress { next_index, match_index: 0, read_seq: 0 }))
.collect();
Self {
progress,
writes: HashMap::new(),
reads: VecDeque::new(),
read_seq: 0,
since_heartbeat: 0,
}
}
}
impl Role for Leader {}
impl RawNode<Leader> {
/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Stepping the
/// received message may then follow the new leader, if there is one.
fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term > self.term(), "leader can only become follower in later term");
info!("Discovered new term {term}");
// Abort in-flight requests. The client must retry. Sort the requests
// by ID for test determinism.
for write in std::mem::take(&mut self.role.writes).into_values().sorted_by_key(|w| w.id) {
let response = Err(Error::Abort);
self.send(write.from, Message::ClientResponse { id: write.id, response })?;
}
for read in std::mem::take(&mut self.role.reads).into_iter().sorted_by_key(|r| r.id) {
let response = Err(Error::Abort);
self.send(read.from, Message::ClientResponse { id: read.id, response })?;
}
self.log.set_term(term, None)?;
let election_timeout = self.random_election_timeout();
Ok(self.into_role(Follower::new(None, election_timeout)))
}
/// Processes an inbound message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Past term: drop the message.
if msg.term < self.term() {
debug!("Dropping message from past term: {msg:?}");
return Ok(self.into());
}
// Future term: become leaderless follower and step the message.
if msg.term > self.term() {
return self.into_follower(msg.term)?.step(msg);
}
match msg.message {
// A follower received our heartbeat and confirms our leadership.
// We may be able to execute new reads, and we may find that the
// follower's log is lagging and requires us to catch it up.
Message::HeartbeatResponse { match_index, read_seq } => {
let (last_index, _) = self.log.get_last_index();
assert!(match_index <= last_index, "future match index");
assert!(read_seq <= self.role.read_seq, "future read sequence number");
// If the read sequence number advances, try to execute reads.
if self.progress(msg.from).advance_read(read_seq) {
self.maybe_read()?;
}
// If the follower didn't match our last index, an append to it
// must have failed (or it's catching up). Probe it to discover
// a matching entry and start replicating. Move next_index back
// to last_index since the follower just told us it doesn't have
// it (or a previous last_index).
if match_index == 0 {
self.progress(msg.from).regress_next(last_index);
self.maybe_send_append(msg.from, true)?;
}
// If the follower's match index advances, an append response
// got lost. Try to commit and apply.
//
// We don't need to eagerly send any pending entries, since any
// proposals made after this heartbeat was sent should have been
// eagerly replicated in steady state. If not, the next
// heartbeat will trigger a probe above.
if self.progress(msg.from).advance(match_index) {
self.maybe_commit_and_apply()?;
}
}
// A follower appended our log entries (or a probe found a match).
// Record its progress and attempt to commit and apply.
Message::AppendResponse { match_index, reject_index: 0 } if match_index > 0 => {
let (last_index, _) = self.log.get_last_index();
assert!(match_index <= last_index, "future match index");
if self.progress(msg.from).advance(match_index) {
self.maybe_commit_and_apply()?;
}
// Eagerly send any further pending entries. This may be a
// successful probe response, or the peer may be lagging and
// we're catching it up one MAX_APPEND_ENTRIES batch at a time.
self.maybe_send_append(msg.from, false)?;
}
// A follower rejected an append because the base entry in
// reject_index did not match its log. Probe the previous entry by
// sending an empty append until we find a common base.
//
// This linear probing can be slow with long divergent logs, but we
// keep it simple. See also section 5.3 in the Raft paper.
Message::AppendResponse { reject_index, match_index: 0 } if reject_index > 0 => {
let (last_index, _) = self.log.get_last_index();
assert!(reject_index <= last_index, "future reject index");
// If the rejected base index is at or below the match index,
// the rejection is stale and can be ignored.
if reject_index <= self.progress(msg.from).match_index {
return Ok(self.into());
}
// Probe below the reject index, if we haven't already moved
// next_index below it. This avoids sending duplicate probes
// (heartbeats will trigger retries if they're lost).
if self.progress(msg.from).regress_next(reject_index) {
self.maybe_send_append(msg.from, true)?;
}
}
// AppendResponses must set either match_index or reject_index.
Message::AppendResponse { .. } => panic!("invalid message {msg:?}"),
// A client submitted a write request. Propose it, and wait until
// it's replicated and applied to the state machine before returning
// the response to the client.
Message::ClientRequest { id, request: Request::Write(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id });
if self.cluster_size() == 1 {
self.maybe_commit_and_apply()?;
}
}
// A client submitted a read request. To ensure linearizability, we
// must confirm that we are still the leader by sending a heartbeat
// with the read's sequence number and wait for quorum confirmation.
Message::ClientRequest { id, request: Request::Read(command) } => {
self.role.read_seq += 1;
let read = Read { seq: self.role.read_seq, from: msg.from, id, command };
self.role.reads.push_back(read);
self.heartbeat()?;
if self.cluster_size() == 1 {
self.maybe_read()?;
}
}
// A client submitted a status command.
Message::ClientRequest { id, request: Request::Status } => {
let response = self.status().map(Response::Status);
self.send(msg.from, Message::ClientResponse { id, response })?;
}
// Don't grant any votes (we've already voted for ourself).
Message::Campaign { .. } => {
self.send(msg.from, Message::CampaignResponse { vote: false })?
}
// Votes can come in after we won the election, ignore them.
Message::CampaignResponse { .. } => {}
// There can't be another leader in this term.
Message::Heartbeat { .. } | Message::Append { .. } => {
panic!("saw other leader {} in term {}", msg.from, msg.term);
}
// Leaders don't proxy client requests.
Message::ClientResponse { .. } => panic!("unexpected message {msg:?}"),
}
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= self.opts.heartbeat_interval {
self.heartbeat()?;
}
Ok(self.into())
}
/// Broadcasts a heartbeat to all peers.
fn heartbeat(&mut self) -> Result<()> {
let (last_index, last_term) = self.log.get_last_index();
let (commit_index, _) = self.log.get_commit_index();
let read_seq = self.role.read_seq;
assert_eq!(last_term, self.term(), "leader's last_term not in current term");
self.role.since_heartbeat = 0;
self.broadcast(Message::Heartbeat { last_index, commit_index, read_seq })
}
/// Proposes a command for consensus by appending it to our log and
/// replicating it to peers. If successful, it will eventually be committed
/// and applied to the state machine.
fn propose(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(command)?;
for peer in self.peers.iter().copied().sorted() {
// Eagerly send the entry to the peer if it's in steady state and
// we've sent all previous entries. Otherwise, the peer is lagging
// and we're probing past entries for a match.
if index == self.progress(peer).next_index {
self.maybe_send_append(peer, false)?;
}
}
Ok(index)
}
/// Commits new entries that have been replicated to a quorum and applies
/// them to the state machine, returning results to clients.
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
// Determine the new commit index by quorum.
let (last_index, _) = self.log.get_last_index();
let quorum_index = self.quorum_value(
self.role.progress.values().map(|p| p.match_index).chain([last_index]).collect(),
);
// If the commit index doesn't advance, do nothing. We don't assert on
// this, since the quorum value may regress e.g. following a restart or
// leader change where followers are initialized with match index 0.
let (old_index, old_term) = self.log.get_commit_index();
if quorum_index <= old_index {
return Ok(old_index);
}
// We can only safely commit an entry from our own term (see section
// 5.4.2 in Raft paper).
match self.log.get(quorum_index)? {
Some(entry) if entry.term == self.term() => {}
Some(_) => return Ok(old_index),
None => panic!("commit index {quorum_index} missing"),
}
// Commit entries.
self.log.commit(quorum_index)?;
// Apply entries and respond to clients.
let term = self.term();
let mut iter = self.log.scan_apply(self.state.get_applied_index());
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
let write = self.role.writes.remove(&entry.index);
let result = self.state.apply(entry);
if let Some(Write { id, from: to }) = write {
let message = Message::ClientResponse { id, response: result.map(Response::Write) };
Self::send_with(&self.tx, Envelope { from: self.id, term, to, message })?;