Skip to content

Commit

Permalink
fix snapshot bug when add new servers to cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 1, 2017
1 parent 1461a58 commit ecacf67
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RaftClientServiceProxy implements RaftClientService {
// servers format is 10.1.1.1:8888,10.2.2.2:9999
public RaftClientServiceProxy(String ipPorts) {
rpcClientOptions.setConnectTimeoutMillis(1000); // 1s
rpcClientOptions.setReadTimeoutMillis(10000); // 10s
rpcClientOptions.setReadTimeoutMillis(3600000); // 1hour
rpcClientOptions.setWriteTimeoutMillis(1000); // 1s
clusterRPCClient = new RPCClient(ipPorts, rpcClientOptions);
clusterRaftClientService = RPCProxy.getProxy(clusterRPCClient, RaftClientService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum NodeState {
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();

private RaftMessage.Configuration configuration;
private Map<Integer, Peer> peerMap = new HashMap<>();
private ConcurrentMap<Integer, Peer> peerMap = new ConcurrentHashMap<>();
private RaftMessage.Server localServer;
private StateMachine stateMachine;
private SegmentedLog raftLog;
Expand Down Expand Up @@ -73,6 +73,7 @@ public RaftNode(List<RaftMessage.Server> servers, RaftMessage.Server localServer
// load log and snapshot
raftLog = new SegmentedLog();
snapshot = new Snapshot();
snapshot.reload();

currentTerm = raftLog.getMetaData().getCurrentTerm();
votedFor = raftLog.getMetaData().getVotedFor();
Expand Down Expand Up @@ -110,7 +111,12 @@ public RaftNode(List<RaftMessage.Server> servers, RaftMessage.Server localServer
}

// init thread pool
executorService = Executors.newFixedThreadPool(peerMap.size() * 2);
executorService = new ThreadPoolExecutor(
peerMap.size() * 2,
peerMap.size() * 4,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
Expand Down Expand Up @@ -143,7 +149,8 @@ public boolean replicate(byte[] data, RaftMessage.EntryType entryType) {
newLastLogIndex = raftLog.append(entries);
raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex());

for (final Peer peer : peerMap.values()) {
for (RaftMessage.Server server : configuration.getServersList()) {
final Peer peer = peerMap.get(server.getServerId());
executorService.submit(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -234,7 +241,7 @@ private void requestVote(Peer peer) {
requestBuilder.setServerId(localServer.getServerId())
.setTerm(currentTerm)
.setLastLogIndex(raftLog.getLastLogIndex())
.setLastLogTerm(raftLog.getLastLogTerm());
.setLastLogTerm(getLastLogTerm());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -370,6 +377,16 @@ public void appendEntries(Peer peer) {
}
}

long lastSnapshotIndex;
long lastSnapshotTerm;
snapshot.getLock().lock();
try {
lastSnapshotIndex = snapshot.getMetaData().getLastIncludedIndex();
lastSnapshotTerm = snapshot.getMetaData().getLastIncludedTerm();
} finally {
snapshot.getLock().unlock();
}

lock.lock();
try {
long firstLogIndex = raftLog.getFirstLogIndex();
Expand All @@ -378,8 +395,8 @@ public void appendEntries(Peer peer) {
long prevLogTerm;
if (prevLogIndex == 0) {
prevLogTerm = 0;
} else if (prevLogIndex == snapshot.getMetaData().getLastIncludedIndex()) {
prevLogTerm = snapshot.getMetaData().getLastIncludedTerm();
} else if (prevLogIndex == lastSnapshotIndex) {
prevLogTerm = lastSnapshotTerm;
} else {
prevLogTerm = raftLog.getEntryTerm(prevLogIndex);
}
Expand Down Expand Up @@ -673,7 +690,7 @@ private void takeSnapshot() {
lock.unlock();
}

long snapshotLastIncludedIndex = 0;
boolean success = false;
snapshot.getLock().lock();
try {
LOG.info("start taking snapshot");
Expand All @@ -692,22 +709,34 @@ private void takeSnapshot() {
FileUtils.moveDirectory(new File(tmpSnapshotDir),
new File(snapshot.getSnapshotDir()));
LOG.info("end taking snapshot, result=success");
snapshotLastIncludedIndex = snapshot.getMetaData().getLastIncludedIndex();
success = true;
} catch (IOException ex) {
LOG.warn("move direct failed when taking snapshot, msg={}", ex.getMessage());
}
} finally {
snapshot.getLock().unlock();
}

// discard old log entries
lock.lock();
try {
if (snapshotLastIncludedIndex > 0 && raftLog.getFirstLogIndex() <= snapshotLastIncludedIndex) {
raftLog.truncatePrefix(snapshotLastIncludedIndex + 1);
if (success) {
// 重新加载snapshot
long lastSnapshotIndex = 0;
snapshot.getLock().lock();
try {
snapshot.reload();
lastSnapshotIndex = snapshot.getMetaData().getLastIncludedIndex();
} finally {
snapshot.getLock().unlock();
}

// discard old log entries
lock.lock();
try {
if (lastSnapshotIndex > 0 && raftLog.getFirstLogIndex() <= lastSnapshotIndex) {
raftLog.truncatePrefix(lastSnapshotIndex + 1);
}
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
} finally {
snapshot.getIsInSnapshot().compareAndSet(true, false);
Expand Down Expand Up @@ -735,6 +764,17 @@ public void applyConfiguration(RaftMessage.LogEntry entry) {
}
}

public long getLastLogTerm() {
long lastLogIndex = raftLog.getLastLogIndex();
if (lastLogIndex >= raftLog.getFirstLogIndex()) {
return raftLog.getEntryTerm(lastLogIndex);
} else {
// log为空,lastLogIndex == lastSnapshotIndex
// TODO: 是否加锁?如何避免死锁?
return snapshot.getMetaData().getLastIncludedTerm();
}
}

public Lock getLock() {
return lock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,9 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request
List<Peer> requestPeers = new ArrayList<>(request.getServersCount());
for (RaftMessage.Server server : request.getServersList()) {
final Peer peer = new Peer(server);
peer.setNextIndex(raftNode.getRaftLog().getLastLogIndex() + 1);
peer.setNextIndex(1);
requestPeers.add(peer);

raftNode.getLock().lock();
try {
raftNode.getPeerMap().put(server.getServerId(), peer);
} finally {
raftNode.getLock().unlock();
}

raftNode.getPeerMap().put(server.getServerId(), peer);
raftNode.getExecutorService().submit(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.service.RaftConsensusService;
import com.github.wenweihu86.raft.storage.Snapshot;
import com.github.wenweihu86.raft.util.ConfigurationUtils;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -47,8 +48,8 @@ public RaftMessage.VoteResponse requestVote(RaftMessage.VoteRequest request) {
if (request.getTerm() > raftNode.getCurrentTerm()) {
raftNode.stepDown(request.getTerm());
}
boolean logIsOk = request.getLastLogTerm() > raftNode.getRaftLog().getLastLogTerm()
|| (request.getLastLogTerm() == raftNode.getRaftLog().getLastLogTerm()
boolean logIsOk = request.getLastLogTerm() > raftNode.getLastLogTerm()
|| (request.getLastLogTerm() == raftNode.getLastLogTerm()
&& request.getLastLogIndex() >= raftNode.getRaftLog().getLastLogIndex());
if (raftNode.getVotedFor() == 0 && logIsOk) {
raftNode.stepDown(request.getTerm());
Expand Down Expand Up @@ -205,6 +206,7 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
file.mkdir();
}
if (request.getIsFirst()) {
LOG.info("begin accept install snapshot request from serverId={}", request.getServerId());
raftNode.getSnapshot().updateMetaData(tmpSnapshotDir,
request.getSnapshotMetaData().getLastIncludedIndex(),
request.getSnapshotMetaData().getLastIncludedTerm(),
Expand Down Expand Up @@ -234,25 +236,46 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
FileUtils.deleteDirectory(snapshotDirFile);
}
FileUtils.moveDirectory(new File(tmpSnapshotDir), snapshotDirFile);
// apply state machine
// TODO: make this async
String snapshotDataDir = raftNode.getSnapshot().getSnapshotDir() + File.separator + "data";
raftNode.getStateMachine().readSnapshot(snapshotDataDir);
}
responseBuilder.setResCode(RaftMessage.ResCode.RES_CODE_SUCCESS);
LOG.info("installSnapshot request from server {} " +
LOG.info("install snapshot request from server {} " +
"in term {} (my term is {}), resCode={}",
request.getServerId(), request.getTerm(),
raftNode.getCurrentTerm(), responseBuilder.getResCode());
return responseBuilder.build();
} catch (IOException ex) {
LOG.warn("io exception, msg={}", ex.getMessage());
return responseBuilder.build();
LOG.warn("when handle installSnapshot request, meet exception:", ex);
} finally {
RaftFileUtils.closeFile(randomAccessFile);
raftNode.getSnapshot().getLock().unlock();
raftNode.getSnapshot().getIsInSnapshot().compareAndSet(true, false);
}

if (request.getIsLast() && responseBuilder.getResCode() == RaftMessage.ResCode.RES_CODE_SUCCESS) {
// apply state machine
// TODO: make this async
String snapshotDataDir = raftNode.getSnapshot().getSnapshotDir() + File.separator + "data";
raftNode.getStateMachine().readSnapshot(snapshotDataDir);
long lastSnapshotIndex;
// 重新加载snapshot
raftNode.getSnapshot().getLock().lock();
try {
raftNode.getSnapshot().reload();
lastSnapshotIndex = raftNode.getSnapshot().getMetaData().getLastIncludedIndex();
} finally {
raftNode.getSnapshot().getLock().unlock();
}

// discard old log entries
raftNode.getLock().lock();
try {
raftNode.getRaftLog().truncatePrefix(lastSnapshotIndex + 1);
} finally {
raftNode.getLock().unlock();
}
LOG.info("end accept install snapshot request from serverId={}", request.getServerId());
}

return responseBuilder.build();
}

// in lock, for follower
Expand All @@ -265,10 +288,12 @@ private void advanceCommitIndex(RaftMessage.AppendEntriesRequest request) {
for (long index = raftNode.getLastAppliedIndex() + 1;
index <= raftNode.getCommitIndex(); index++) {
RaftMessage.LogEntry entry = raftNode.getRaftLog().getEntry(index);
if (entry.getType() == RaftMessage.EntryType.ENTRY_TYPE_DATA) {
raftNode.getStateMachine().apply(entry.getData().toByteArray());
} else if (entry.getType() == RaftMessage.EntryType.ENTRY_TYPE_CONFIGURATION) {
raftNode.applyConfiguration(entry);
if (entry != null) {
if (entry.getType() == RaftMessage.EntryType.ENTRY_TYPE_DATA) {
raftNode.getStateMachine().apply(entry.getData().toByteArray());
} else if (entry.getType() == RaftMessage.EntryType.ENTRY_TYPE_CONFIGURATION) {
raftNode.applyConfiguration(entry);
}
}
raftNode.setLastAppliedIndex(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SegmentedLog() {
LOG.error("No readable metadata file but found segments in {}", logDir);
throw new RuntimeException("No readable metadata file but found segments");
}
metaData = RaftMessage.LogMetaData.newBuilder().setFirstLogIndex(0).build();
metaData = RaftMessage.LogMetaData.newBuilder().setFirstLogIndex(1).build();
}
}

Expand Down Expand Up @@ -72,29 +72,20 @@ public long getEntryTerm(long index) {
}

public long getFirstLogIndex() {
if (startLogIndexSegmentMap.size() == 0) {
return 0;
}
Segment firstSegment = startLogIndexSegmentMap.firstEntry().getValue();
return firstSegment.getStartIndex();
return metaData.getFirstLogIndex();
}

public long getLastLogIndex() {
// 有两种情况segment为空
// 1、第一次初始化,firstLogIndex = 1,lastLogIndex = 0
// 2、snapshot刚完成,日志正好被清理掉,firstLogIndex = snapshotIndex + 1, lastLogIndex = snapshotIndex
if (startLogIndexSegmentMap.size() == 0) {
return 0;
return getFirstLogIndex() - 1;
}
Segment lastSegment = startLogIndexSegmentMap.lastEntry().getValue();
return lastSegment.getEndIndex();
}

public long getLastLogTerm() {
long lastLogIndex = this.getLastLogIndex();
if (lastLogIndex == 0) {
return 0;
}
return this.getEntryTerm(lastLogIndex);
}

public long append(List<RaftMessage.LogEntry> entries) {
long newLastLogIndex = this.getLastLogIndex();
for (RaftMessage.LogEntry entry : entries) {
Expand Down Expand Up @@ -187,7 +178,12 @@ public void truncatePrefix(long newFirstIndex) {
break;
}
}
long newActualFirstIndex = getFirstLogIndex();
long newActualFirstIndex;
if (startLogIndexSegmentMap.size() == 0) {
newActualFirstIndex = newFirstIndex;
} else {
newActualFirstIndex = getFirstLogIndex();
}
updateMetaData(null, null, newActualFirstIndex);
LOG.info("Truncating log from old first index {} to new first index {}",
oldFirstIndex, newActualFirstIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public Snapshot() {
if (!file.exists()) {
file.mkdirs();
}
}

public void reload() {
if (snapshotDataFileMap != null) {
for (SnapshotDataFile file : snapshotDataFileMap.values()) {
RaftFileUtils.closeFile(file.randomAccessFile);
}
}
this.snapshotDataFileMap = readSnapshotDataFiles();
metaData = this.readMetaData();
if (metaData == null) {
Expand Down Expand Up @@ -87,7 +95,6 @@ public void updateMetaData(String dir,
.setLastIncludedIndex(lastIncludedIndex)
.setLastIncludedTerm(lastIncludedTerm)
.setConfiguration(configuration).build();
this.metaData = snapshotMetaData;
String snapshotMetaFile = dir + File.separator + "metadata";
RandomAccessFile randomAccessFile = null;
try {
Expand All @@ -102,7 +109,7 @@ public void updateMetaData(String dir,
}
file.createNewFile();
randomAccessFile = new RandomAccessFile(file, "rw");
RaftFileUtils.writeProtoToFile(randomAccessFile, metaData);
RaftFileUtils.writeProtoToFile(randomAccessFile, snapshotMetaData);
} catch (IOException ex) {
LOG.warn("meta file not exist, name={}", snapshotMetaFile);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testTruncateSuffix() throws IOException {
RaftOptions.dataDir = "./data";
RaftOptions.maxSegmentFileSize = 32;
SegmentedLog segmentedLog = new SegmentedLog();
Assert.assertTrue(segmentedLog.getFirstLogIndex() == 0);
Assert.assertTrue(segmentedLog.getFirstLogIndex() == 1);

List<RaftMessage.LogEntry> entries = new ArrayList<>();
for (int i = 1; i < 10; i++) {
Expand All @@ -38,9 +38,6 @@ public void testTruncateSuffix() throws IOException {
Assert.assertTrue(lastLogIndex == 9);

segmentedLog.truncatePrefix(5);
long firstLogIndex = segmentedLog.getFirstLogIndex();
Assert.assertTrue(firstLogIndex == 5);

FileUtils.deleteDirectory(new File(RaftOptions.dataDir));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public void writeSnapshot(String snapshotDir) {
public void readSnapshot(String snapshotDir) {
try {
// copy snapshot dir to data dir
if (db != null) {
db.close();
}
String dataDir = RaftOptions.dataDir + File.separator + "rocksdb_data";
File dataFile = new File(dataDir);
if (dataFile.exists()) {
Expand Down

0 comments on commit ecacf67

Please sign in to comment.