-
Notifications
You must be signed in to change notification settings - Fork 557
/
node.rs
2029 lines (1806 loc) · 81.4 KB
/
node.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use super::{
Envelope, Index, Log, Message, ReadSequence, Request, RequestID, Response, State, Status,
};
use crate::errinput;
use crate::error::{Error, Result};
use itertools::Itertools as _;
use log::{debug, info};
use rand::Rng as _;
use std::collections::{HashMap, HashSet, VecDeque};
/// A node ID.
pub type NodeID = u8;
/// A leader term.
pub type Term = u64;
/// A logical clock interval as number of ticks.
pub type Ticks = u8;
/// Raft node options.
pub struct Options {
/// The number of ticks between leader heartbeats.
pub heartbeat_interval: Ticks,
/// The range of randomized election timeouts for followers and candidates.
pub election_timeout_range: std::ops::Range<Ticks>,
/// Maximum number of entries to send in a single Append message.
pub max_append_entries: usize,
}
impl Default for Options {
fn default() -> Self {
Self {
heartbeat_interval: super::HEARTBEAT_INTERVAL,
election_timeout_range: super::ELECTION_TIMEOUT_RANGE,
max_append_entries: super::MAX_APPEND_ENTRIES,
}
}
}
/// A Raft node, with a dynamic role. The node is driven synchronously by
/// processing inbound messages via step() or by advancing time via tick().
/// These methods consume the current node, and return a new one with a possibly
/// different role. Outbound messages are sent via the given node_tx channel.
///
/// This enum wraps the RawNode<Role> types, which implement the actual
/// node logic. It exists for ergonomic use across role transitions, i.e
/// node = node.step()?.
pub enum Node {
Candidate(RawNode<Candidate>),
Follower(RawNode<Follower>),
Leader(RawNode<Leader>),
}
/// Helper macro which calls a closure on the inner RawNode<R>.
macro_rules! with_rawnode {
// Borrowed (using ref keyword).
(ref $node:expr, $fn:expr) => {{
fn with_rawnode<R: Role, T>(node: &RawNode<R>, f: impl Fn(&RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(ref n) => with_rawnode(n, $fn),
Node::Follower(ref n) => with_rawnode(n, $fn),
Node::Leader(ref n) => with_rawnode(n, $fn),
}
}};
// Owned.
($node:expr, $fn:expr) => {{
fn with_rawnode<R: Role, T>(node: RawNode<R>, f: impl FnOnce(RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(n) => with_rawnode(n, $fn),
Node::Follower(n) => with_rawnode(n, $fn),
Node::Leader(n) => with_rawnode(n, $fn),
}
}};
}
impl Node {
/// Creates a new Raft node, starting as a leaderless follower, or leader if
/// there are no peers.
pub fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let node = RawNode::new(id, peers, log, state, node_tx, opts)?;
if node.peers.is_empty() {
// If there are no peers, become leader immediately.
return Ok(node.into_candidate()?.into_leader()?.into());
}
Ok(node.into())
}
/// Returns the node ID.
pub fn id(&self) -> NodeID {
with_rawnode!(ref self, |n| n.id)
}
/// Returns the node term.
pub fn term(&self) -> Term {
with_rawnode!(ref self, |n| n.term())
}
/// Processes an inbound message.
pub fn step(self, msg: Envelope) -> Result<Self> {
with_rawnode!(ref self, |n| {
assert_eq!(msg.to, n.id, "message to other node: {msg:?}");
assert!(n.peers.contains(&msg.from) || msg.from == n.id, "unknown sender: {msg:?}");
});
debug!("Stepping {msg:?}");
with_rawnode!(self, |n| n.step(msg))
}
/// Moves time forward by a tick.
pub fn tick(self) -> Result<Self> {
with_rawnode!(self, |n| n.tick())
}
}
impl From<RawNode<Candidate>> for Node {
fn from(n: RawNode<Candidate>) -> Self {
Node::Candidate(n)
}
}
impl From<RawNode<Follower>> for Node {
fn from(n: RawNode<Follower>) -> Self {
Node::Follower(n)
}
}
impl From<RawNode<Leader>> for Node {
fn from(n: RawNode<Leader>) -> Self {
Node::Leader(n)
}
}
/// A Raft role: leader, follower, or candidate.
pub trait Role {}
/// A Raft node with the concrete role R.
///
/// This implements the typestate pattern, where individual node states (roles)
/// are encoded as RawNode<Role>. See: https://cliffle.com/blog/rust-typestate/
pub struct RawNode<R: Role = Follower> {
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
role: R,
}
impl<R: Role> RawNode<R> {
/// Helper for role transitions.
fn into_role<T: Role>(self, role: T) -> RawNode<T> {
RawNode {
id: self.id,
peers: self.peers,
log: self.log,
state: self.state,
node_tx: self.node_tx,
opts: self.opts,
role,
}
}
/// Returns the node's current term. Convenience wrapper for Log.get_term().
fn term(&self) -> Term {
self.log.get_term().0
}
/// Returns the cluster size as number of nodes.
fn cluster_size(&self) -> usize {
self.peers.len() + 1
}
/// Returns the cluster quorum size (strict majority).
fn quorum_size(&self) -> usize {
self.cluster_size() / 2 + 1
}
/// Returns the quorum value of the given unsorted vector, in descending
/// order. The slice must have the same size as the cluster.
fn quorum_value<T: Ord + Copy>(&self, mut values: Vec<T>) -> T {
assert_eq!(values.len(), self.cluster_size(), "vector size must match cluster size");
*values.select_nth_unstable_by(self.quorum_size() - 1, |a, b: &T| a.cmp(b).reverse()).1
}
/// Sends a message.
fn send(&self, to: NodeID, message: Message) -> Result<()> {
Self::send_with(&self.node_tx, Envelope { from: self.id, to, term: self.term(), message })
}
/// Sends a message without borrowing self, to allow partial borrows.
fn send_with(tx: &crossbeam::channel::Sender<Envelope>, msg: Envelope) -> Result<()> {
debug!("Sending {msg:?}");
Ok(tx.send(msg)?)
}
/// Broadcasts a message to all peers.
fn broadcast(&self, message: Message) -> Result<()> {
// Sort for test determinism.
for id in self.peers.iter().copied().sorted() {
self.send(id, message.clone())?;
}
Ok(())
}
/// Generates a randomized election timeout.
fn gen_election_timeout(&self) -> Ticks {
rand::thread_rng().gen_range(self.opts.election_timeout_range.clone())
}
}
/// A candidate is campaigning to become a leader.
pub struct Candidate {
/// Votes received (including ourself).
votes: HashSet<NodeID>,
/// Ticks elapsed since election start.
election_duration: Ticks,
/// Election timeout, in ticks.
election_timeout: Ticks,
}
impl Candidate {
/// Creates a new candidate role.
fn new(election_timeout: Ticks) -> Self {
Self { votes: HashSet::new(), election_duration: 0, election_timeout }
}
}
impl Role for Candidate {}
impl RawNode<Candidate> {
/// Transitions the candidate to a follower. We either lost the election and
/// follow the winner, or we discovered a new term in which case we step
/// into it as a leaderless follower.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
let election_timeout = self.gen_election_timeout();
if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Lost election, following leader {leader} in term {term}");
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}
/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
let (term, vote) = self.log.get_term();
assert_ne!(term, 0, "leaders can't have term 0");
assert_eq!(vote, Some(self.id), "leader did not vote for self");
info!("Won election for term {term}, becoming leader");
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
// Propose an empty command when assuming leadership, to disambiguate
// previous entries in the log. See section 8 in the Raft paper.
//
// We do this prior to the heartbeat, to avoid a wasted replication
// roundtrip if the heartbeat response indicates the peer is behind.
node.propose(None)?;
node.maybe_commit_and_apply()?;
node.heartbeat()?;
Ok(node)
}
/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}
// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term() {
return self.into_follower(msg.term, None)?.step(msg);
}
match msg.message {
// Don't grant votes for other candidates who also campaign.
Message::Campaign { .. } => {
self.send(msg.from, Message::CampaignResponse { vote: false })?
}
// If we received a vote, record it. If the vote gives us quorum,
// assume leadership.
Message::CampaignResponse { vote: true } => {
self.role.votes.insert(msg.from);
if self.role.votes.len() >= self.quorum_size() {
return Ok(self.into_leader()?.into());
}
}
// We didn't get a vote. :(
Message::CampaignResponse { vote: false } => {}
// If we receive a heartbeat or entries in this term, we lost the
// election and have a new leader. Follow it and step the message.
Message::Heartbeat { .. } | Message::Append { .. } => {
return self.into_follower(msg.term, Some(msg.from))?.step(msg);
}
// Abort any inbound client requests while candidate.
Message::ClientRequest { id, .. } => {
self.send(msg.from, Message::ClientResponse { id, response: Err(Error::Abort) })?;
}
// We're not a leader in this term, nor are we forwarding requests,
// so we shouldn't see these.
Message::HeartbeatResponse { .. }
| Message::AppendResponse { .. }
| Message::ClientResponse { .. } => panic!("Received unexpected message {:?}", msg),
}
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
self.campaign()?;
}
assert!(self.role.election_duration < self.role.election_timeout, "past election timeout");
Ok(self.into())
}
/// Campaign for leadership by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
fn campaign(&mut self) -> Result<()> {
let term = self.term() + 1;
info!("Starting new election for term {term}");
self.role = Candidate::new(self.gen_election_timeout());
self.role.votes.insert(self.id); // vote for ourself
self.log.set_term(term, Some(self.id))?;
let (last_index, last_term) = self.log.get_last_index();
self.broadcast(Message::Campaign { last_index, last_term })?;
Ok(())
}
}
// A follower replicates state from a leader.
pub struct Follower {
/// The leader, or None if just initialized.
leader: Option<NodeID>,
/// The number of ticks since the last message from the leader.
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
// Local client requests that have been forwarded to the leader. These are
// aborted on leader/term changes.
forwarded: HashSet<RequestID>,
}
impl Follower {
/// Creates a new follower role.
fn new(leader: Option<NodeID>, election_timeout: Ticks) -> Self {
Self { leader, leader_seen: 0, election_timeout, forwarded: HashSet::new() }
}
}
impl Role for Follower {}
impl RawNode<Follower> {
/// Creates a new node as a leaderless follower.
fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
) -> Result<Self> {
if peers.contains(&id) {
return errinput!("node ID {id} can't be in peers");
}
let role = Follower::new(None, 0);
let mut node = Self { id, peers, log, state, node_tx, opts, role };
node.role.election_timeout = node.gen_election_timeout();
Ok(node)
}
/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;
// Apply any pending log entries, so that we're caught up if we win.
self.maybe_apply()?;
// Become candidate and campaign.
let election_timeout = self.gen_election_timeout();
let mut node = self.into_role(Candidate::new(election_timeout));
node.campaign()?;
let (term, vote) = node.log.get_term();
assert!(node.role.votes.contains(&node.id), "candidate did not vote for self");
assert_ne!(term, 0, "candidate can't have term 0");
assert_eq!(vote, Some(node.id), "log vote does not match self");
Ok(node)
}
/// Transitions the follower into a follower, either a leaderless follower
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
fn into_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
assert_ne!(term, 0, "can't become follower in term 0");
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;
if let Some(leader) = leader {
// We found a leader in the current term.
assert!(self.peers.contains(&leader), "leader is not a peer");
assert_eq!(self.role.leader, None, "already have leader in term");
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Following leader {leader} in term {term}");
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
self.role = Follower::new(None, self.gen_election_timeout());
}
Ok(self)
}
/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}
// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term() {
return self.into_follower(None, msg.term)?.step(msg);
}
// Record when we last saw a message from the leader (if any).
if self.is_leader(msg.from) {
self.role.leader_seen = 0
}
match msg.message {
// The leader will send periodic heartbeats. If we don't have a
// leader in this term yet, follow it. If the commit_index advances,
// apply state transitions.
Message::Heartbeat { last_index, commit_index, read_seq } => {
assert!(commit_index <= last_index, "commit_index after last_index");
// Check that the heartbeat is from our leader.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(Some(msg.from), msg.term)?,
}
// Attempt to match the leader's log and respond to the
// heartbeat. last_index always has the leader's term.
let match_index = if self.log.has(last_index, msg.term)? { last_index } else { 0 };
self.send(msg.from, Message::HeartbeatResponse { match_index, read_seq })?;
// Advance commit index and apply entries. We can only do this
// if the last_index matches the leader, which implies that
// the logs are identical up to match_index. This also implies
// that the commit_index is present in our log.
if match_index != 0 && commit_index > self.log.get_commit_index().0 {
self.log.commit(commit_index)?;
self.maybe_apply()?;
}
}
// Append log entries from the leader to the local log.
Message::Append { base_index, base_term, entries } => {
if let Some(first) = entries.first() {
assert_eq!(base_index, first.index - 1, "base index mismatch");
}
// Make sure the message comes from our leader.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(Some(msg.from), msg.term)?,
}
// If the base entry is in our log, append the entries.
let (mut reject_index, mut match_index) = (0, 0);
if base_index == 0 || self.log.has(base_index, base_term)? {
match_index = entries.last().map(|e| e.index).unwrap_or(base_index);
self.log.splice(entries)?;
} else {
// Otherwise, reject the base index. If the local log is
// shorter than the base index, lower the reject index to
// skip all the missing entries.
reject_index = std::cmp::min(base_index, self.log.get_last_index().0 + 1);
}
self.send(msg.from, Message::AppendResponse { reject_index, match_index })?;
}
// A candidate in this term is requesting our vote.
Message::Campaign { last_index, last_term } => {
// Don't vote if we already voted for someone else in this term.
if let (_, Some(vote)) = self.log.get_term() {
if msg.from != vote {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
}
// Don't vote if our log is newer than the candidate's log.
let (log_index, log_term) = self.log.get_last_index();
if log_term > last_term || log_term == last_term && log_index > last_index {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
// Grant the vote.
info!("Voting for {} in term {} election", msg.from, msg.term);
self.log.set_term(msg.term, Some(msg.from))?;
self.send(msg.from, Message::CampaignResponse { vote: true })?;
}
// We may receive a vote after we lost an election and followed a
// different leader. Ignore it.
Message::CampaignResponse { .. } => {}
// Forward client requests to the leader, or abort them if there is
// none (the client must retry).
Message::ClientRequest { id, .. } => {
assert_eq!(msg.from, self.id, "Client request from other node");
if let Some(leader) = self.role.leader {
debug!("Forwarding request to leader {}: {:?}", leader, msg);
self.role.forwarded.insert(id);
self.send(leader, msg.message)?
} else {
self.send(
msg.from,
Message::ClientResponse { id, response: Err(Error::Abort) },
)?
}
}
// Returns client responses for forwarded requests.
Message::ClientResponse { id, response } => {
assert!(self.is_leader(msg.from), "Client response from non-leader");
if self.role.forwarded.remove(&id) {
self.send(self.id, Message::ClientResponse { id, response })?;
}
}
// We're not a leader nor candidate in this term, so we shoudn't see these.
Message::HeartbeatResponse { .. } | Message::AppendResponse { .. } => {
panic!("Received unexpected message {msg:?}")
}
};
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
}
Ok(self.into())
}
/// Aborts all forwarded requests.
fn abort_forwarded(&mut self) -> Result<()> {
// Sort the IDs for test determinism.
for id in std::mem::take(&mut self.role.forwarded).into_iter().sorted() {
debug!("Aborting forwarded request {:x?}", id);
self.send(self.id, Message::ClientResponse { id, response: Err(Error::Abort) })?;
}
Ok(())
}
/// Applies any pending log entries.
fn maybe_apply(&mut self) -> Result<()> {
let mut iter = self.log.scan_apply(self.state.get_applied_index());
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
// Throw away the result, since there is no client waiting for it.
// This includes errors -- any non-deterministic errors (e.g. IO
// errors) must panic instead to avoid replica divergence.
_ = self.state.apply(entry);
}
Ok(())
}
/// Checks if an address is the current leader.
fn is_leader(&self, from: NodeID) -> bool {
self.role.leader == Some(from)
}
}
/// Follower replication progress.
struct Progress {
/// The next index to replicate to the follower.
next_index: Index,
/// The last index where the follower's log matches the leader.
match_index: Index,
/// The last read sequence number confirmed by the peer.
read_seq: ReadSequence,
}
impl Progress {
/// Attempts to advance a follower's match index, returning true if it did.
/// If next_index is below it, it is advanced to the following index, but
/// is otherwise left as is to avoid regressing it unnecessarily.
fn advance(&mut self, match_index: Index) -> bool {
if match_index <= self.match_index {
return false;
}
self.match_index = match_index;
self.next_index = std::cmp::max(self.next_index, match_index + 1);
true
}
/// Attempts to advance a follower's read_seq, returning true if it did.
fn advance_read(&mut self, read_seq: ReadSequence) -> bool {
if read_seq <= self.read_seq {
return false;
}
self.read_seq = read_seq;
true
}
/// Regresses the next index to the given index, if it's currently greater.
/// Can't regress below match_index + 1. Returns true if next_index changes.
fn regress_next(&mut self, next_index: Index) -> bool {
if next_index >= self.next_index || self.next_index <= self.match_index + 1 {
return false;
}
self.next_index = std::cmp::max(next_index, self.match_index + 1);
true
}
}
/// A pending client write request.
struct Write {
/// The node which submitted the write.
from: NodeID,
/// The write request ID.
id: RequestID,
}
/// A pending client read request.
struct Read {
/// The sequence number of this read.
seq: ReadSequence,
/// The node which submitted the read.
from: NodeID,
/// The read request ID.
id: RequestID,
/// The read command.
command: Vec<u8>,
}
// A leader serves requests and replicates the log to followers.
pub struct Leader {
/// Follower replication progress.
progress: HashMap<NodeID, Progress>,
/// Keeps track of pending write requests, keyed by log index. These are
/// added when the write is proposed and appended to the leader's log, and
/// removed when the command is applied to the state machine, sending the
/// command result to the waiting client.
///
/// If the leader loses leadership, all pending write requests are aborted
/// by returning Error::Abort.
writes: HashMap<Index, Write>,
/// Keeps track of pending read requests. To guarantee linearizability, read
/// requests are assigned a sequence number and registered here when
/// received, but only executed once a quorum of nodes have confirmed the
/// current leader by responding to heartbeats with the sequence number.
///
/// If we lose leadership before the command is processed, all pending read
/// requests are aborted by returning Error::Abort.
reads: VecDeque<Read>,
/// The read sequence number used for the last read. Incremented for every
/// read command, and reset when we lose leadership (thus only valid for
/// this term).
read_seq: ReadSequence,
/// Number of ticks since last periodic heartbeat.
since_heartbeat: Ticks,
}
impl Leader {
/// Creates a new leader role.
fn new(peers: HashSet<NodeID>, last_index: Index) -> Self {
let next_index = last_index + 1;
let progress = peers
.into_iter()
.map(|p| (p, Progress { next_index, match_index: 0, read_seq: 0 }))
.collect();
Self {
progress,
writes: HashMap::new(),
reads: VecDeque::new(),
read_seq: 0,
since_heartbeat: 0,
}
}
}
impl Role for Leader {}
impl RawNode<Leader> {
/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Subsequently
/// stepping the received message may discover the leader, if there is one.
fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term > self.term(), "leader can only become follower in later term");
info!("Discovered new term {term}");
// Cancel in-flight requests.
for write in std::mem::take(&mut self.role.writes).into_values().sorted_by_key(|w| w.id) {
self.send(
write.from,
Message::ClientResponse { id: write.id, response: Err(Error::Abort) },
)?;
}
for read in std::mem::take(&mut self.role.reads).into_iter().sorted_by_key(|r| r.id) {
self.send(
read.from,
Message::ClientResponse { id: read.id, response: Err(Error::Abort) },
)?;
}
self.log.set_term(term, None)?;
let election_timeout = self.gen_election_timeout();
Ok(self.into_role(Follower::new(None, election_timeout)))
}
/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}
// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term() {
return self.into_follower(msg.term)?.step(msg);
}
match msg.message {
// There can't be two leaders in the same term.
Message::Heartbeat { .. } | Message::Append { .. } => {
panic!("saw other leader {} in term {}", msg.from, msg.term);
}
// A follower received our heartbeat and confirms our leadership.
Message::HeartbeatResponse { match_index, read_seq } => {
let (last_index, _) = self.log.get_last_index();
assert!(match_index <= last_index, "future match index");
assert!(read_seq <= self.role.read_seq, "future read sequence number");
// If the read sequence number advances, try to execute reads.
if self.progress(msg.from).advance_read(read_seq) {
self.maybe_read()?;
}
if match_index == 0 {
// If the follower didn't match our last index, an append to
// it must have failed (or it's catching up). Probe it to
// discover a matching entry and start replicating. Move
// next_index back to last_index since the follower just
// told us it doesn't have it.
self.progress(msg.from).regress_next(last_index);
self.maybe_send_append(msg.from, true)?;
} else if self.progress(msg.from).advance(match_index) {
// If the follower's match index advanced, an append
// response got lost. Try to commit.
//
// We don't need to eagerly send any pending entries, since
// any proposals made after this heartbeat was sent should
// have been eagerly replicated in steady state. If not, the
// next heartbeat will trigger a probe above.
self.maybe_commit_and_apply()?;
}
}
// A follower appended our log entries. Record its progress and
// attempt to commit.
Message::AppendResponse { match_index, reject_index: 0 } if match_index > 0 => {
let (last_index, _) = self.log.get_last_index();
assert!(match_index <= last_index, "follower matched unknown index");
if self.progress(msg.from).advance(match_index) {
self.maybe_commit_and_apply()?;
}
// Eagerly send any further pending entries. The peer may be
// lagging behind the leader, and we're catching it up one
// MAX_APPEND_ENTRIES batch at a time. Or we may have received a
// probe response at the known match index, in which case
// advance() above would have returned false.
self.maybe_send_append(msg.from, false)?;
}
// A follower rejected the log entries because the base entry in
// reject_index did not match its log. Try a previous entry until we
// find a common base.
//
// This linear probing can be slow with long divergent logs, but we
// keep it simple.
Message::AppendResponse { reject_index, match_index: 0 } if reject_index > 0 => {
let (last_index, _) = self.log.get_last_index();
assert!(reject_index <= last_index, "follower rejected unknown index");
// If the rejected base index is at or below the match index,
// the rejection is stale and can be ignored.
if reject_index <= self.progress(msg.from).match_index {
return Ok(self.into());
}
// Probe below the reject index, if we haven't already moved
// next_index below it.
if self.progress(msg.from).regress_next(reject_index) {
self.maybe_send_append(msg.from, true)?;
}
}
Message::AppendResponse { .. } => panic!("invalid message {msg:?}"),
// A client submitted a read command. To ensure linearizability, we
// must confirm that we are still the leader by sending a heartbeat
// with the read's sequence number and wait for confirmation from a
// quorum before executing the read.
Message::ClientRequest { id, request: Request::Read(command) } => {
self.role.read_seq += 1;
self.role.reads.push_back(Read {
seq: self.role.read_seq,
from: msg.from,
id,
command,
});
if self.peers.is_empty() {
self.maybe_read()?;
}
self.heartbeat()?;
}
// A client submitted a write command. Propose it, and track it
// until it's applied and the response is returned to the client.
Message::ClientRequest { id, request: Request::Write(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id });
if self.peers.is_empty() {
self.maybe_commit_and_apply()?;
}
}
Message::ClientRequest { id, request: Request::Status } => {
let status = Status {
leader: self.id,
term: self.term(),
match_index: self
.role
.progress
.iter()
.map(|(id, p)| (*id, p.match_index))
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
.collect(),
commit_index: self.log.get_commit_index().0,
apply_index: self.state.get_applied_index(),
storage: self.log.status()?,
};
self.send(
msg.from,
Message::ClientResponse { id, response: Ok(Response::Status(status)) },
)?;
}
// Don't grant other votes in this term.
Message::Campaign { .. } => {
self.send(msg.from, Message::CampaignResponse { vote: false })?
}
// Votes can come in after we won the election, ignore them.
Message::CampaignResponse { .. } => {}
// Leaders never proxy client requests, so we don't expect to see
// responses from other nodes.
Message::ClientResponse { .. } => panic!("Unexpected message {:?}", msg),
}
Ok(self.into())
}
/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= self.opts.heartbeat_interval {
self.heartbeat()?;
self.role.since_heartbeat = 0;
}
Ok(self.into())
}
/// Broadcasts a heartbeat to all peers.
fn heartbeat(&mut self) -> Result<()> {
let (last_index, last_term) = self.log.get_last_index();
let (commit_index, _) = self.log.get_commit_index();
let read_seq = self.role.read_seq;
assert_eq!(last_term, self.term(), "leader has stale last_term");
self.broadcast(Message::Heartbeat { last_index, commit_index, read_seq })?;
// NB: We don't reset self.since_heartbeat here, because we want to send
// periodic heartbeats regardless of any on-demand heartbeats.
Ok(())
}
/// Returns a mutable borrow of a node's progress.
fn progress(&mut self, id: NodeID) -> &mut Progress {
self.role.progress.get_mut(&id).expect("unknown node")
}
/// Proposes a command for consensus by appending it to our log and
/// replicating it to peers. If successful, it will eventually be committed
/// and applied to the state machine.
fn propose(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(command)?;
for peer in self.peers.iter().copied().sorted() {
// Eagerly send the entry to the peer, but only if it is in steady
// state where we've already sent the previous entries. Otherwise,
// we're probing a lagging or divergent peer, and there's no point
// sending this entry since it will likely be rejected.
if index == self.progress(peer).next_index {
self.maybe_send_append(peer, false)?;
}
}
Ok(index)
}
/// Commits any new log entries that have been replicated to a quorum, and
/// applies them to the state machine.
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
// Determine the new commit index.
let quorum_index = self.quorum_value(
self.role
.progress
.values()
.map(|p| p.match_index)
.chain(std::iter::once(self.log.get_last_index().0))
.collect(),
);
// If the commit index doesn't advance, do nothing. We don't assert on
// this, since the quorum value may regress e.g. following a restart or
// leader change where followers are initialized with log index 0.
let (mut commit_index, old_commit_term) = self.log.get_commit_index();
if quorum_index <= commit_index {
return Ok(commit_index);
}
// We can only safely commit an entry from our own term (see figure 8 in
// Raft paper).
commit_index = match self.log.get(quorum_index)? {
Some(entry) if entry.term == self.term() => quorum_index,
Some(_) => return Ok(commit_index),
None => panic!("missing commit index {quorum_index} missing"),
};