Skip to content

Commit

Permalink
fix snapshot bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 30, 2017
1 parent 7180184 commit 1461a58
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 209 deletions.
2 changes: 1 addition & 1 deletion raft-java-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
6 changes: 3 additions & 3 deletions raft-java-admin/start_admin.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

java -cp dependency/*:raft-java-admin-1.2.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf get
java -cp dependency/*:raft-java-admin-1.3.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf get

java -cp dependency/*:raft-java-admin-1.2.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf add "127.0.0.1:8054:4,127.0.0.1:8055:5"
java -cp dependency/*:raft-java-admin-1.3.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf add "127.0.0.1:8054:4,127.0.0.1:8055:5"

java -cp dependency/*:raft-java-admin-1.2.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf del "127.0.0.1:8054:4,127.0.0.1:8055:5"
java -cp dependency/*:raft-java-admin-1.3.0.jar com.github.wenweihu86.raft.admin.AdminMain "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" conf del "127.0.0.1:8054:4,127.0.0.1:8055:5"
138 changes: 84 additions & 54 deletions raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,43 +496,57 @@ private boolean installSnapshot(Peer peer) {
return false;
}

LOG.info("begin installSnapshot");
LOG.info("begin install snapshot");
boolean isSuccess = true;
boolean isLastRequest = false;
String lastFileName = null;
long lastOffset = 0;
long lastLength = 0;
while (!isLastRequest) {
RaftMessage.InstallSnapshotRequest request
= buildInstallSnapshotRequest(lastFileName, lastOffset, lastLength);
if (request == null) {
isSuccess = false;
break;
}
if (request.getIsLast()) {
isLastRequest = true;
}
RaftMessage.InstallSnapshotResponse response = peer.getRaftConsensusService().installSnapshot(request);
if (response != null && response.getResCode() == RaftMessage.ResCode.RES_CODE_SUCCESS) {
lastFileName = request.getFileName();
lastOffset = request.getOffset();
lastLength = request.getData().size();
} else {
isSuccess = false;
break;
try {
boolean isLastRequest = false;
String lastFileName = null;
long lastOffset = 0;
long lastLength = 0;
while (!isLastRequest) {
RaftMessage.InstallSnapshotRequest request
= buildInstallSnapshotRequest(lastFileName, lastOffset, lastLength);
if (request == null) {
LOG.warn("snapshot request == null");
isSuccess = false;
break;
}
if (request.getIsLast()) {
isLastRequest = true;
}
LOG.info("install snapshot request, fileName={}, offset={}, size={}, isFirst={}, isLast={}",
request.getFileName(), request.getOffset(), request.getData().toByteArray().length,
request.getIsFirst(), request.getIsLast());
RaftMessage.InstallSnapshotResponse response = peer.getRaftConsensusService().installSnapshot(request);
if (response != null && response.getResCode() == RaftMessage.ResCode.RES_CODE_SUCCESS) {
lastFileName = request.getFileName();
lastOffset = request.getOffset();
lastLength = request.getData().size();
} else {
isSuccess = false;
break;
}
}
}

lock.lock();
try {
if (isSuccess) {
peer.setNextIndex(snapshot.getMetaData().getLastIncludedIndex() + 1);
long lastIncludedIndexInSnapshot;
snapshot.getLock().lock();
try {
lastIncludedIndexInSnapshot = snapshot.getMetaData().getLastIncludedIndex();
} finally {
snapshot.getLock().unlock();
}

lock.lock();
try {
peer.setNextIndex(lastIncludedIndexInSnapshot + 1);
} finally {
lock.unlock();
}
}
} finally {
lock.unlock();
snapshot.getIsInSnapshot().compareAndSet(true, false);
}

snapshot.getIsInSnapshot().compareAndSet(true, false);
LOG.info("install snapshot success={}", isSuccess);
return isSuccess;
}
Expand Down Expand Up @@ -563,6 +577,7 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
Map.Entry<String, Snapshot.SnapshotDataFile> currentEntry
= snapshotDataFileMap.higherEntry(lastFileName);
if (currentEntry == null) {
LOG.warn("reach the last file={}", lastFileName);
return null;
}
currentDataFile = currentEntry.getValue();
Expand All @@ -574,6 +589,7 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
}
}
byte[] currentData = new byte[currentDataSize];
currentDataFile.randomAccessFile.seek(currentOffset);
currentDataFile.randomAccessFile.read(currentData);
requestBuilder.setData(ByteString.copyFrom(currentData));
requestBuilder.setFileName(currentFileName);
Expand All @@ -591,7 +607,8 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
} else {
requestBuilder.setIsFirst(false);
}
} catch (IOException ex) {
} catch (Exception ex) {
LOG.warn("meet exception:", ex);
return null;
} finally {
snapshot.getLock().unlock();
Expand Down Expand Up @@ -630,32 +647,34 @@ public void stepDown(long newTerm) {

private void takeSnapshot() {
if (!snapshot.getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("already in snapshot");
LOG.info("already in snapshot, ignore takeSnapshot");
return;
}

long localLastAppliedIndex;
long lastAppliedTerm = 0;
RaftMessage.Configuration.Builder localConfiguration = RaftMessage.Configuration.newBuilder();
lock.lock();
try {
if (raftLog.getTotalSize() < RaftOptions.snapshotMinLogSize) {
return;
}
if (lastAppliedIndex <= snapshot.getMetaData().getLastIncludedIndex()) {
return;
}
localLastAppliedIndex = lastAppliedIndex;
if (lastAppliedIndex >= raftLog.getFirstLogIndex()
&& lastAppliedIndex <= raftLog.getLastLogIndex()) {
lastAppliedTerm = raftLog.getEntryTerm(lastAppliedIndex);
long localLastAppliedIndex;
long lastAppliedTerm = 0;
RaftMessage.Configuration.Builder localConfiguration = RaftMessage.Configuration.newBuilder();
lock.lock();
try {
if (raftLog.getTotalSize() < RaftOptions.snapshotMinLogSize) {
return;
}
if (lastAppliedIndex <= snapshot.getMetaData().getLastIncludedIndex()) {
return;
}
localLastAppliedIndex = lastAppliedIndex;
if (lastAppliedIndex >= raftLog.getFirstLogIndex()
&& lastAppliedIndex <= raftLog.getLastLogIndex()) {
lastAppliedTerm = raftLog.getEntryTerm(lastAppliedIndex);
}
localConfiguration.mergeFrom(configuration);
} finally {
lock.unlock();
}
localConfiguration.mergeFrom(configuration);
} finally {
lock.unlock();
}

if (snapshot.getLock().tryLock()) {
long snapshotLastIncludedIndex = 0;
snapshot.getLock().lock();
try {
LOG.info("start taking snapshot");
// take snapshot
Expand All @@ -672,16 +691,27 @@ private void takeSnapshot() {
}
FileUtils.moveDirectory(new File(tmpSnapshotDir),
new File(snapshot.getSnapshotDir()));
LOG.info("end taking snapshot, result=success");
snapshotLastIncludedIndex = snapshot.getMetaData().getLastIncludedIndex();
} catch (IOException ex) {
LOG.warn("move direct failed when taking snapshot, msg={}", ex.getMessage());
}
LOG.info("end taking snapshot");
} finally {
snapshot.getLock().unlock();
}

// discard old log entries
lock.lock();
try {
if (snapshotLastIncludedIndex > 0 && raftLog.getFirstLogIndex() <= snapshotLastIncludedIndex) {
raftLog.truncatePrefix(snapshotLastIncludedIndex + 1);
}
} finally {
lock.unlock();
}
} finally {
snapshot.getIsInSnapshot().compareAndSet(true, false);
}

snapshot.getIsInSnapshot().compareAndSet(true, false);
}

// in lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request
requestPeers.add(peer);

raftNode.getLock().lock();
raftNode.getPeerMap().put(server.getServerId(), peer);
raftNode.getLock().unlock();
try {
raftNode.getPeerMap().put(server.getServerId(), peer);
} finally {
raftNode.getLock().unlock();
}

raftNode.getExecutorService().submit(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public RaftMessage.AppendEntriesResponse appendEntries(RaftMessage.AppendEntries
public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSnapshotRequest request) {
RaftMessage.InstallSnapshotResponse.Builder responseBuilder
= RaftMessage.InstallSnapshotResponse.newBuilder();
responseBuilder.setResCode(RaftMessage.ResCode.RES_CODE_FAIL);

raftNode.getLock().lock();
try {
Expand All @@ -188,6 +189,12 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
raftNode.getLock().unlock();
}

if (!raftNode.getSnapshot().getIsInSnapshot().compareAndSet(false, true)) {
LOG.warn("alreay in snapshot");
return responseBuilder.build();
}

RandomAccessFile randomAccessFile = null;
raftNode.getSnapshot().getLock().lock();
try {
// write snapshot data to local
Expand All @@ -204,45 +211,47 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
request.getSnapshotMetaData().getConfiguration());
}
// write to file
RandomAccessFile randomAccessFile = null;
try {
String currentDataFileName = tmpSnapshotDir + File.separator
+ "data" + File.separator + request.getFileName();
File currentDataFile = new File(currentDataFileName);
if (!currentDataFile.exists()) {
currentDataFile.createNewFile();
}
randomAccessFile = RaftFileUtils.openFile(
tmpSnapshotDir + File.separator + "data",
request.getFileName(), "rw");
randomAccessFile.skipBytes((int) request.getOffset());
randomAccessFile.write(request.getData().toByteArray());
RaftFileUtils.closeFile(randomAccessFile);
// move tmp dir to snapshot dir if this is the last package
if (request.getIsLast()) {
File snapshotDirFile = new File(raftNode.getSnapshot().getSnapshotDir());
if (snapshotDirFile.exists()) {
snapshotDirFile.delete();
}
FileUtils.moveDirectory(new File(tmpSnapshotDir), snapshotDirFile);
// apply state machine
// TODO: make this async
String snapshotDataDir = raftNode.getSnapshot().getSnapshotDir() + File.separator + "data";
raftNode.getStateMachine().readSnapshot(snapshotDataDir);
String currentDataDirName = tmpSnapshotDir + File.separator + "data";
File currentDataDir = new File(currentDataDirName);
if (!currentDataDir.exists()) {
currentDataDir.mkdirs();
}

String currentDataFileName = currentDataDirName + File.separator + request.getFileName();
File currentDataFile = new File(currentDataFileName);
if (!currentDataFile.exists()) {
currentDataFile.createNewFile();
}
randomAccessFile = RaftFileUtils.openFile(
tmpSnapshotDir + File.separator + "data",
request.getFileName(), "rw");
randomAccessFile.seek(request.getOffset());
randomAccessFile.write(request.getData().toByteArray());
// move tmp dir to snapshot dir if this is the last package
if (request.getIsLast()) {
File snapshotDirFile = new File(raftNode.getSnapshot().getSnapshotDir());
if (snapshotDirFile.exists()) {
FileUtils.deleteDirectory(snapshotDirFile);
}
responseBuilder.setResCode(RaftMessage.ResCode.RES_CODE_SUCCESS);
} catch (IOException ex) {
LOG.warn("io exception, msg={}", ex.getMessage());
} finally {
RaftFileUtils.closeFile(randomAccessFile);
FileUtils.moveDirectory(new File(tmpSnapshotDir), snapshotDirFile);
// apply state machine
// TODO: make this async
String snapshotDataDir = raftNode.getSnapshot().getSnapshotDir() + File.separator + "data";
raftNode.getStateMachine().readSnapshot(snapshotDataDir);
}
responseBuilder.setResCode(RaftMessage.ResCode.RES_CODE_SUCCESS);
LOG.info("installSnapshot request from server {} " +
"in term {} (my term is {}), resCode={}",
request.getServerId(), request.getTerm(),
raftNode.getCurrentTerm(), responseBuilder.getResCode());
return responseBuilder.build();
} catch (IOException ex) {
LOG.warn("io exception, msg={}", ex.getMessage());
return responseBuilder.build();
} finally {
RaftFileUtils.closeFile(randomAccessFile);
raftNode.getSnapshot().getLock().unlock();
raftNode.getSnapshot().getIsInSnapshot().compareAndSet(true, false);
}
}

Expand Down
Loading

0 comments on commit 1461a58

Please sign in to comment.