Skip to content

Commit

Permalink
bugfix: pesist commitIndex into raft meta log
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Feb 23, 2020
1 parent bd706cf commit 128a159
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-all</artifactId>
<artifactId>raft-java-parent</artifactId>
<version>1.9.0</version>
<packaging>pom</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public RaftNode(RaftOptions raftOptions,

currentTerm = raftLog.getMetaData().getCurrentTerm();
votedFor = raftLog.getMetaData().getVotedFor();
commitIndex = Math.max(snapshot.getMetaData().getLastIncludedIndex(), commitIndex);
commitIndex = Math.max(snapshot.getMetaData().getLastIncludedIndex(), raftLog.getMetaData().getCommitIndex());
// discard old log entries
if (snapshot.getMetaData().getLastIncludedIndex() > 0
&& raftLog.getFirstLogIndex() <= snapshot.getMetaData().getLastIncludedIndex()) {
Expand Down Expand Up @@ -156,7 +156,7 @@ public boolean replicate(byte[] data, RaftProto.EntryType entryType) {
List<RaftProto.LogEntry> entries = new ArrayList<>();
entries.add(logEntry);
newLastLogIndex = raftLog.append(entries);
raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex());
// raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex());

for (RaftProto.Server server : configuration.getServersList()) {
final Peer peer = peerMap.get(server.getServerId());
Expand Down Expand Up @@ -304,7 +304,7 @@ public void stepDown(long newTerm) {
currentTerm = newTerm;
leaderId = 0;
votedFor = 0;
raftLog.updateMetaData(currentTerm, votedFor, null);
raftLog.updateMetaData(currentTerm, votedFor, null, null);
}
state = NodeState.STATE_FOLLOWER;
// stop heartbeat
Expand Down Expand Up @@ -760,6 +760,7 @@ private void advanceCommitIndex() {
}
long oldCommitIndex = commitIndex;
commitIndex = newCommitIndex;
raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex(), commitIndex);
// 同步到状态机
for (long index = oldCommitIndex + 1; index <= newCommitIndex; index++) {
RaftProto.LogEntry entry = raftLog.getEntry(index);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message LogMetaData {
optional uint64 current_term = 1;
optional uint32 voted_for = 2;
optional uint64 first_log_index = 3;
optional uint64 commit_index = 4;
}

message SnapshotMetaData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RaftProto.VoteResponse requestVote(RaftProto.VoteRequest request) {
if (raftNode.getVotedFor() == 0 && logIsOk) {
raftNode.stepDown(request.getTerm());
raftNode.setVotedFor(request.getServerId());
raftNode.getRaftLog().updateMetaData(raftNode.getCurrentTerm(), raftNode.getVotedFor(), null);
raftNode.getRaftLog().updateMetaData(raftNode.getCurrentTerm(), raftNode.getVotedFor(), null, null);
responseBuilder.setGranted(true);
responseBuilder.setTerm(raftNode.getCurrentTerm());
}
Expand Down Expand Up @@ -174,8 +174,8 @@ public RaftProto.AppendEntriesResponse appendEntries(RaftProto.AppendEntriesRequ
entries.add(entry);
}
raftNode.getRaftLog().append(entries);
raftNode.getRaftLog().updateMetaData(raftNode.getCurrentTerm(),
null, raftNode.getRaftLog().getFirstLogIndex());
// raftNode.getRaftLog().updateMetaData(raftNode.getCurrentTerm(),
// null, raftNode.getRaftLog().getFirstLogIndex());
responseBuilder.setLastLogIndex(raftNode.getRaftLog().getLastLogIndex());

advanceCommitIndex(request);
Expand Down Expand Up @@ -313,6 +313,7 @@ private void advanceCommitIndex(RaftProto.AppendEntriesRequest request) {
long newCommitIndex = Math.min(request.getCommitIndex(),
request.getPrevLogIndex() + request.getEntriesCount());
raftNode.setCommitIndex(newCommitIndex);
raftNode.getRaftLog().updateMetaData(null,null, null, newCommitIndex);
if (raftNode.getLastAppliedIndex() < raftNode.getCommitIndex()) {
// apply state machine
for (long index = raftNode.getLastAppliedIndex() + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void truncatePrefix(long newFirstIndex) {
} else {
newActualFirstIndex = startLogIndexSegmentMap.firstKey();
}
updateMetaData(null, null, newActualFirstIndex);
updateMetaData(null, null, newActualFirstIndex, null);
LOG.info("Truncating log from old first index {} to new first index {}",
oldFirstIndex, newActualFirstIndex);
}
Expand Down Expand Up @@ -316,7 +316,15 @@ public RaftProto.LogMetaData readMetaData() {
}
}

public void updateMetaData(Long currentTerm, Integer votedFor, Long firstLogIndex) {
/**
* 更新raft log meta data,
* 包括commitIndex, fix bug: https://github.com/wenweihu86/raft-java/issues/19
* @param currentTerm
* @param votedFor
* @param firstLogIndex
* @param commitIndex
*/
public void updateMetaData(Long currentTerm, Integer votedFor, Long firstLogIndex, Long commitIndex) {
RaftProto.LogMetaData.Builder builder = RaftProto.LogMetaData.newBuilder(this.metaData);
if (currentTerm != null) {
builder.setCurrentTerm(currentTerm);
Expand All @@ -327,6 +335,9 @@ public void updateMetaData(Long currentTerm, Integer votedFor, Long firstLogInde
if (firstLogIndex != null) {
builder.setFirstLogIndex(firstLogIndex);
}
if (commitIndex != null) {
builder.setCommitIndex(commitIndex);
}
this.metaData = builder.build();

String fileName = logDir + File.separator + "metadata";
Expand Down

0 comments on commit 128a159

Please sign in to comment.