Skip to content

Commit

Permalink
separate snapshot lock and raft node lock
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 29, 2017
1 parent 2ddab5f commit 979eff3
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 69 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-parent</artifactId>
<version>1.2.0</version>
<artifactId>raft-java-all</artifactId>
<version>1.3.0</version>
<packaging>pom</packaging>

<name>raft-java-parent</name>
<name>raft-java-all</name>
<url>https://maven.apache.org</url>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion raft-java-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-admin</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
<packaging>jar</packaging>

<name>raft-java-admin</name>
Expand Down
2 changes: 1 addition & 1 deletion raft-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
<packaging>jar</packaging>

<name>raft-java-core</name>
Expand Down
145 changes: 84 additions & 61 deletions raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public enum NodeState {
private long commitIndex;
// 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增)
private volatile long lastAppliedIndex;
private volatile boolean isInSnapshot;

private Lock lock = new ReentrantLock();
private Condition commitIndexCondition = lock.newCondition();
Expand Down Expand Up @@ -353,23 +352,28 @@ public void appendEntries(Peer peer) {
RaftMessage.AppendEntriesRequest.Builder requestBuilder = RaftMessage.AppendEntriesRequest.newBuilder();
long prevLogIndex;
long numEntries;

boolean isNeedInstallSnapshot = false;
lock.lock();
try {
long firstLogIndex = raftLog.getFirstLogIndex();
if (peer.getNextIndex() < firstLogIndex) {
lock.unlock();
boolean success;
try {
success = installSnapshot(peer);
} finally {
lock.lock();
}
if (!success) {
return;
}
isNeedInstallSnapshot = true;
}
Validate.isTrue(peer.getNextIndex() >= firstLogIndex);
} finally {
lock.unlock();
}

if (isNeedInstallSnapshot) {
if (!installSnapshot(peer)) {
return;
}
}

lock.lock();
try {
long firstLogIndex = raftLog.getFirstLogIndex();
Validate.isTrue(peer.getNextIndex() >= firstLogIndex);
prevLogIndex = peer.getNextIndex() - 1;
long prevLogTerm;
if (prevLogIndex == 0) {
Expand Down Expand Up @@ -487,22 +491,20 @@ private long packEntries(long nextIndex, RaftMessage.AppendEntriesRequest.Builde
}

private boolean installSnapshot(Peer peer) {
if (!snapshot.getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("already in snapshot");
return false;
}

LOG.info("begin installSnapshot");
boolean isSuccess = true;
boolean isLastRequest = false;
String lastFileName = null;
long lastOffset = 0;
long lastLength = 0;
while (!isLastRequest) {
lock.lock();
RaftMessage.InstallSnapshotRequest request = null;
try {
isInSnapshot = true;
request = buildInstallSnapshotRequest(
lastFileName, lastOffset, lastLength);
} finally {
lock.unlock();
}
RaftMessage.InstallSnapshotRequest request
= buildInstallSnapshotRequest(lastFileName, lastOffset, lastLength);
if (request == null) {
isSuccess = false;
break;
Expand All @@ -526,18 +528,20 @@ private boolean installSnapshot(Peer peer) {
if (isSuccess) {
peer.setNextIndex(snapshot.getMetaData().getLastIncludedIndex() + 1);
}
isInSnapshot = false;
} finally {
lock.unlock();
}

snapshot.getIsInSnapshot().compareAndSet(true, false);
LOG.info("install snapshot success={}", isSuccess);
return isSuccess;
}

// in lock
private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
String lastFileName, long lastOffset, long lastLength) {
RaftMessage.InstallSnapshotRequest.Builder requestBuilder = RaftMessage.InstallSnapshotRequest.newBuilder();

snapshot.getLock().lock();
try {
TreeMap<String, Snapshot.SnapshotDataFile> snapshotDataFileMap = snapshot.getSnapshotDataFileMap();
if (lastFileName == null) {
Expand Down Expand Up @@ -574,8 +578,6 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
requestBuilder.setData(ByteString.copyFrom(currentData));
requestBuilder.setFileName(currentFileName);
requestBuilder.setOffset(currentOffset);
requestBuilder.setTerm(currentTerm);
requestBuilder.setServerId(localServer.getServerId());
requestBuilder.setIsFirst(false);
if (currentFileName.equals(snapshotDataFileMap.lastKey())
&& currentOffset + currentDataSize >= currentDataFile.randomAccessFile.length()) {
Expand All @@ -585,13 +587,25 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
}
if (currentFileName.equals(snapshotDataFileMap.firstKey()) && currentOffset == 0) {
requestBuilder.setIsFirst(true);
requestBuilder.setSnapshotMetaData(snapshot.getMetaData());
} else {
requestBuilder.setIsFirst(false);
}
return requestBuilder.build();
} catch (IOException ex) {
return null;
} finally {
snapshot.getLock().unlock();
}

lock.lock();
try {
requestBuilder.setTerm(currentTerm);
requestBuilder.setServerId(localServer.getServerId());
} finally {
lock.unlock();
}

return requestBuilder.build();
}

// in lock
Expand All @@ -615,50 +629,59 @@ public void stepDown(long newTerm) {
}

private void takeSnapshot() {
if (isInSnapshot) {
if (!snapshot.getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("already in snapshot");
return;
}
if (lock.tryLock()) {
try {
if (raftLog.getTotalSize() < RaftOptions.snapshotMinLogSize) {
return;
}
if (lastAppliedIndex <= snapshot.getMetaData().getLastIncludedIndex()) {
return;
}
long lastAppliedTerm = 0;
if (lastAppliedIndex >= raftLog.getFirstLogIndex()
&& lastAppliedIndex <= raftLog.getLastLogIndex()) {
lastAppliedTerm = raftLog.getEntryTerm(lastAppliedIndex);
}

if (!isInSnapshot) {
isInSnapshot = true;
LOG.info("start taking snapshot");
// take snapshot
String tmpSnapshotDir = snapshot.getSnapshotDir() + ".tmp";
snapshot.updateMetaData(tmpSnapshotDir,
lastAppliedIndex, lastAppliedTerm, configuration);
String tmpSnapshotDataDir = tmpSnapshotDir + File.separator + "data";
stateMachine.writeSnapshot(tmpSnapshotDataDir);
// rename tmp snapshot dir to snapshot dir
try {
File snapshotDirFile = new File(snapshot.getSnapshotDir());
if (snapshotDirFile.exists()) {
FileUtils.deleteDirectory(snapshotDirFile);
}
FileUtils.moveDirectory(new File(tmpSnapshotDir),
new File(snapshot.getSnapshotDir()));
} catch (IOException ex) {
LOG.warn("move direct failed when taking snapshot, msg={}", ex.getMessage());
long localLastAppliedIndex;
long lastAppliedTerm = 0;
RaftMessage.Configuration.Builder localConfiguration = RaftMessage.Configuration.newBuilder();
lock.lock();
try {
if (raftLog.getTotalSize() < RaftOptions.snapshotMinLogSize) {
return;
}
if (lastAppliedIndex <= snapshot.getMetaData().getLastIncludedIndex()) {
return;
}
localLastAppliedIndex = lastAppliedIndex;
if (lastAppliedIndex >= raftLog.getFirstLogIndex()
&& lastAppliedIndex <= raftLog.getLastLogIndex()) {
lastAppliedTerm = raftLog.getEntryTerm(lastAppliedIndex);
}
localConfiguration.mergeFrom(configuration);
} finally {
lock.unlock();
}

if (snapshot.getLock().tryLock()) {
try {
LOG.info("start taking snapshot");
// take snapshot
String tmpSnapshotDir = snapshot.getSnapshotDir() + ".tmp";
snapshot.updateMetaData(tmpSnapshotDir, localLastAppliedIndex,
lastAppliedTerm, localConfiguration.build());
String tmpSnapshotDataDir = tmpSnapshotDir + File.separator + "data";
stateMachine.writeSnapshot(tmpSnapshotDataDir);
// rename tmp snapshot dir to snapshot dir
try {
File snapshotDirFile = new File(snapshot.getSnapshotDir());
if (snapshotDirFile.exists()) {
FileUtils.deleteDirectory(snapshotDirFile);
}
LOG.info("end taking snapshot");
FileUtils.moveDirectory(new File(tmpSnapshotDir),
new File(snapshot.getSnapshotDir()));
} catch (IOException ex) {
LOG.warn("move direct failed when taking snapshot, msg={}", ex.getMessage());
}
LOG.info("end taking snapshot");
} finally {
isInSnapshot = false;
lock.unlock();
}
}

snapshot.getIsInSnapshot().compareAndSet(true, false);
}

// in lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ public RaftMessage.AppendEntriesResponse appendEntries(RaftMessage.AppendEntries

@Override
public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSnapshotRequest request) {
RaftMessage.InstallSnapshotResponse.Builder responseBuilder
= RaftMessage.InstallSnapshotResponse.newBuilder();

raftNode.getLock().lock();
try {
RaftMessage.InstallSnapshotResponse.Builder responseBuilder
= RaftMessage.InstallSnapshotResponse.newBuilder();
responseBuilder.setTerm(raftNode.getCurrentTerm());
if (request.getTerm() < raftNode.getCurrentTerm()) {
return responseBuilder.build();
Expand All @@ -183,7 +184,12 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
ex.printStackTrace();
}
}
} finally {
raftNode.getLock().unlock();
}

raftNode.getSnapshot().getLock().lock();
try {
// write snapshot data to local
String tmpSnapshotDir = raftNode.getSnapshot().getSnapshotDir() + ".tmp";
File file = new File(tmpSnapshotDir);
Expand Down Expand Up @@ -236,7 +242,7 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
raftNode.getCurrentTerm(), responseBuilder.getResCode());
return responseBuilder.build();
} finally {
raftNode.getLock().unlock();
raftNode.getSnapshot().getLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by wenweihu86 on 2017/5/6.
Expand All @@ -28,6 +31,8 @@ public class SnapshotDataFile {
private String snapshotDir = RaftOptions.dataDir + File.separator + "snapshot";
private RaftMessage.SnapshotMetaData metaData;
private TreeMap<String, SnapshotDataFile> snapshotDataFileMap;
private AtomicBoolean isInSnapshot = new AtomicBoolean(false);
private Lock lock = new ReentrantLock();

public Snapshot() {
String snapshotDataDir = snapshotDir + File.separator + "data";
Expand Down Expand Up @@ -116,4 +121,16 @@ public String getSnapshotDir() {
public TreeMap<String, SnapshotDataFile> getSnapshotDataFileMap() {
return snapshotDataFileMap;
}

public AtomicBoolean getIsInSnapshot() {
return isInSnapshot;
}

public void setIsInSnapshot(AtomicBoolean isInSnapshot) {
this.isInSnapshot = isInSnapshot;
}

public Lock getLock() {
return lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
public class ConfigurationUtils {

// configuration不会太大,所以这里直接遍历了
public static boolean containsServer(RaftMessage.Configuration configuration, int serverId) {
for (RaftMessage.Server server : configuration.getServersList()) {
if (server.getServerId() == serverId) {
Expand Down

0 comments on commit 979eff3

Please sign in to comment.