Skip to content

Commit

Permalink
update RaftOptions member to private
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Sep 16, 2017
1 parent 2eada10 commit 7e6d26b
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 56 deletions.
4 changes: 2 additions & 2 deletions raft-java-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-admin</artifactId>
<version>1.4.0</version>
<version>1.5.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>raft-java-admin</name>
Expand Down Expand Up @@ -34,7 +34,7 @@
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.3.0</version>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
2 changes: 1 addition & 1 deletion raft-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.4.0</version>
<version>1.5.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>raft-java-core</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum NodeState {
private static final Logger LOG = LoggerFactory.getLogger(RaftNode.class);
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();

private RaftOptions raftOptions;
private RaftMessage.Configuration configuration;
private ConcurrentMap<Integer, Peer> peerMap = new ConcurrentHashMap<>();
private RaftMessage.Server localServer;
Expand Down Expand Up @@ -60,7 +61,11 @@ public enum NodeState {
private ScheduledFuture electionScheduledFuture;
private ScheduledFuture heartbeatScheduledFuture;

public RaftNode(List<RaftMessage.Server> servers, RaftMessage.Server localServer, StateMachine stateMachine) {
public RaftNode(RaftOptions raftOptions,
List<RaftMessage.Server> servers,
RaftMessage.Server localServer,
StateMachine stateMachine) {
this.raftOptions = raftOptions;
RaftMessage.Configuration.Builder confBuilder = RaftMessage.Configuration.newBuilder();
for (RaftMessage.Server server : servers) {
confBuilder.addServers(server);
Expand All @@ -71,8 +76,8 @@ public RaftNode(List<RaftMessage.Server> servers, RaftMessage.Server localServer
this.stateMachine = stateMachine;

// load log and snapshot
raftLog = new SegmentedLog();
snapshot = new Snapshot();
raftLog = new SegmentedLog(raftOptions.getDataDir(), raftOptions.getMaxSegmentFileSize());
snapshot = new Snapshot(raftOptions.getDataDir());
snapshot.reload();

currentTerm = raftLog.getMetaData().getCurrentTerm();
Expand Down Expand Up @@ -114,8 +119,8 @@ public void init() {

// init thread pool
executorService = new ThreadPoolExecutor(
RaftOptions.raftConsensusThreadNum,
RaftOptions.raftConsensusThreadNum,
raftOptions.getRaftConsensusThreadNum(),
raftOptions.getRaftConsensusThreadNum(),
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
Expand All @@ -125,7 +130,7 @@ public void init() {
public void run() {
takeSnapshot();
}
}, RaftOptions.snapshotPeriodSeconds, RaftOptions.snapshotPeriodSeconds, TimeUnit.SECONDS);
}, raftOptions.getSnapshotPeriodSeconds(), raftOptions.getSnapshotPeriodSeconds(), TimeUnit.SECONDS);
// start election
resetElectionTimer();
}
Expand Down Expand Up @@ -161,10 +166,10 @@ public void run() {
// sync wait commitIndex >= newLastLogIndex
long startTime = System.currentTimeMillis();
while (lastAppliedIndex < newLastLogIndex) {
if (System.currentTimeMillis() - startTime >= RaftOptions.maxAwaitTimeout) {
if (System.currentTimeMillis() - startTime >= raftOptions.getMaxAwaitTimeout()) {
break;
}
commitIndexCondition.await(RaftOptions.maxAwaitTimeout, TimeUnit.MILLISECONDS);
commitIndexCondition.await(raftOptions.getMaxAwaitTimeout(), TimeUnit.MILLISECONDS);
}
} catch (Exception ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -193,8 +198,8 @@ public void run() {

private int getElectionTimeoutMs() {
ThreadLocalRandom random = ThreadLocalRandom.current();
int randomElectionTimeout = RaftOptions.electionTimeoutMilliseconds
+ random.nextInt(0, RaftOptions.electionTimeoutMilliseconds);
int randomElectionTimeout = raftOptions.getElectionTimeoutMilliseconds()
+ random.nextInt(0, raftOptions.getElectionTimeoutMilliseconds());
LOG.debug("new election time is after {} ms", randomElectionTimeout);
return randomElectionTimeout;
}
Expand Down Expand Up @@ -337,7 +342,7 @@ private void resetHeartbeatTimer() {
public void run() {
startNewHeartbeat();
}
}, RaftOptions.heartbeatPeriodMilliseconds, TimeUnit.MILLISECONDS);
}, raftOptions.getHeartbeatPeriodMilliseconds(), TimeUnit.MILLISECONDS);
}

// in lock, 开始心跳,对leader有效
Expand Down Expand Up @@ -439,7 +444,7 @@ public void appendEntries(Peer peer) {
if (ConfigurationUtils.containsServer(configuration, peer.getServer().getServerId())) {
advanceCommitIndex();
} else {
if (raftLog.getLastLogIndex() - peer.getMatchIndex() <= RaftOptions.catchupMargin) {
if (raftLog.getLastLogIndex() - peer.getMatchIndex() <= raftOptions.getCatchupMargin()) {
LOG.debug("peer catch up the leader");
peer.setCatchUp(true);
// signal the caller thread
Expand Down Expand Up @@ -499,7 +504,7 @@ private void advanceCommitIndex() {
// in lock
private long packEntries(long nextIndex, RaftMessage.AppendEntriesRequest.Builder requestBuilder) {
long lastIndex = Math.min(raftLog.getLastLogIndex(),
nextIndex + RaftOptions.maxLogEntriesPerRequest - 1);
nextIndex + raftOptions.getMaxLogEntriesPerRequest() - 1);
for (long index = nextIndex; index <= lastIndex; index++) {
RaftMessage.LogEntry entry = raftLog.getEntry(index);
requestBuilder.addEntries(entry);
Expand Down Expand Up @@ -592,10 +597,10 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
long lastFileLength = lastFile.randomAccessFile.length();
String currentFileName = lastFileName;
long currentOffset = lastOffset + lastLength;
int currentDataSize = RaftOptions.maxSnapshotBytesPerRequest;
int currentDataSize = raftOptions.getMaxSnapshotBytesPerRequest();
Snapshot.SnapshotDataFile currentDataFile = lastFile;
if (lastOffset + lastLength < lastFileLength) {
if (lastOffset + lastLength + RaftOptions.maxSnapshotBytesPerRequest > lastFileLength) {
if (lastOffset + lastLength + raftOptions.getMaxSnapshotBytesPerRequest() > lastFileLength) {
currentDataSize = (int) (lastFileLength - (lastOffset + lastLength));
}
} else {
Expand All @@ -609,7 +614,7 @@ private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
currentFileName = currentEntry.getKey();
currentOffset = 0;
int currentFileLenght = (int) currentEntry.getValue().randomAccessFile.length();
if (currentFileLenght < RaftOptions.maxSnapshotBytesPerRequest) {
if (currentFileLenght < raftOptions.getMaxSnapshotBytesPerRequest()) {
currentDataSize = currentFileLenght;
}
}
Expand Down Expand Up @@ -683,7 +688,7 @@ public void takeSnapshot() {
RaftMessage.Configuration.Builder localConfiguration = RaftMessage.Configuration.newBuilder();
lock.lock();
try {
if (raftLog.getTotalSize() < RaftOptions.snapshotMinLogSize) {
if (raftLog.getTotalSize() < raftOptions.getSnapshotMinLogSize()) {
return;
}
if (lastAppliedIndex <= snapshot.getMetaData().getLastIncludedIndex()) {
Expand Down
111 changes: 100 additions & 11 deletions raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,120 @@ public class RaftOptions {

// A follower would become a candidate if it doesn't receive any message
// from the leader in electionTimeoutMs milliseconds
public static int electionTimeoutMilliseconds = 5000;
private int electionTimeoutMilliseconds = 5000;

// A leader sends RPCs at least this often, even if there is no data to send
public static int heartbeatPeriodMilliseconds = 500;
private int heartbeatPeriodMilliseconds = 500;

// snapshot定时器执行间隔
public static int snapshotPeriodSeconds = 3600;
private int snapshotPeriodSeconds = 3600;
// log entry大小达到snapshotMinLogSize,才做snapshot
public static int snapshotMinLogSize = 100 * 1024 * 1024;
public static int maxSnapshotBytesPerRequest = 500 * 1024; // 500k
private int snapshotMinLogSize = 100 * 1024 * 1024;
private int maxSnapshotBytesPerRequest = 500 * 1024; // 500k

public static int maxLogEntriesPerRequest = 5000;
private int maxLogEntriesPerRequest = 5000;

// 单个segment文件大小,默认100m
public static int maxSegmentFileSize = 100 * 1000 * 1000;
private int maxSegmentFileSize = 100 * 1000 * 1000;

// follower与leader差距在catchupMargin,才可以参与选举和提供服务
public static long catchupMargin = 500;
private long catchupMargin = 500;

// replicate最大等待超时时间,单位ms
public static long maxAwaitTimeout = 1000;
private long maxAwaitTimeout = 1000;

// 与其他节点进行同步、选主等操作的线程池大小
public static int raftConsensusThreadNum = 20;
private int raftConsensusThreadNum = 20;

// raft的log和snapshot父目录,绝对路径
public static String dataDir = System.getProperty("com.github.wenweihu86.raft.data.dir");
private String dataDir = System.getProperty("com.github.wenweihu86.raft.data.dir");

public int getElectionTimeoutMilliseconds() {
return electionTimeoutMilliseconds;
}

public void setElectionTimeoutMilliseconds(int electionTimeoutMilliseconds) {
this.electionTimeoutMilliseconds = electionTimeoutMilliseconds;
}

public int getHeartbeatPeriodMilliseconds() {
return heartbeatPeriodMilliseconds;
}

public void setHeartbeatPeriodMilliseconds(int heartbeatPeriodMilliseconds) {
this.heartbeatPeriodMilliseconds = heartbeatPeriodMilliseconds;
}

public int getSnapshotPeriodSeconds() {
return snapshotPeriodSeconds;
}

public void setSnapshotPeriodSeconds(int snapshotPeriodSeconds) {
this.snapshotPeriodSeconds = snapshotPeriodSeconds;
}

public int getSnapshotMinLogSize() {
return snapshotMinLogSize;
}

public void setSnapshotMinLogSize(int snapshotMinLogSize) {
this.snapshotMinLogSize = snapshotMinLogSize;
}

public int getMaxSnapshotBytesPerRequest() {
return maxSnapshotBytesPerRequest;
}

public void setMaxSnapshotBytesPerRequest(int maxSnapshotBytesPerRequest) {
this.maxSnapshotBytesPerRequest = maxSnapshotBytesPerRequest;
}

public int getMaxLogEntriesPerRequest() {
return maxLogEntriesPerRequest;
}

public void setMaxLogEntriesPerRequest(int maxLogEntriesPerRequest) {
this.maxLogEntriesPerRequest = maxLogEntriesPerRequest;
}

public int getMaxSegmentFileSize() {
return maxSegmentFileSize;
}

public void setMaxSegmentFileSize(int maxSegmentFileSize) {
this.maxSegmentFileSize = maxSegmentFileSize;
}

public long getCatchupMargin() {
return catchupMargin;
}

public void setCatchupMargin(long catchupMargin) {
this.catchupMargin = catchupMargin;
}

public long getMaxAwaitTimeout() {
return maxAwaitTimeout;
}

public void setMaxAwaitTimeout(long maxAwaitTimeout) {
this.maxAwaitTimeout = maxAwaitTimeout;
}

public int getRaftConsensusThreadNum() {
return raftConsensusThreadNum;
}

public void setRaftConsensusThreadNum(int raftConsensusThreadNum) {
this.raftConsensusThreadNum = raftConsensusThreadNum;
}

public String getDataDir() {
return dataDir;
}

public void setDataDir(String dataDir) {
this.dataDir = dataDir;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.service.RaftConsensusService;
import com.github.wenweihu86.raft.storage.Snapshot;
import com.github.wenweihu86.raft.util.ConfigurationUtils;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ public class SegmentedLog {

private static Logger LOG = LoggerFactory.getLogger(SegmentedLog.class);

private String logDir = RaftOptions.dataDir + File.separator + "log";
private String logDataDir = logDir + File.separator + "data";
private String logDir;
private String logDataDir;
private int maxSegmentFileSize;
private RaftMessage.LogMetaData metaData;
private TreeMap<Long, Segment> startLogIndexSegmentMap = new TreeMap<>();
// segment log占用的内存大小,用于判断是否需要做snapshot
private volatile long totalSize;

public SegmentedLog() {
public SegmentedLog(String raftDataDir, int maxSegmentFileSize) {
this.logDir = raftDataDir + File.separator + "log";
this.logDataDir = logDir + File.separator + "data";
this.maxSegmentFileSize = maxSegmentFileSize;
File file = new File(logDataDir);
if (!file.exists()) {
file.mkdirs();
Expand Down Expand Up @@ -100,7 +104,7 @@ public long append(List<RaftMessage.LogEntry> entries) {
Segment segment = startLogIndexSegmentMap.lastEntry().getValue();
if (!segment.isCanWrite()) {
isNeedNewSegmentFile = true;
} else if (segment.getFileSize() + entrySize >= RaftOptions.maxSegmentFileSize) {
} else if (segment.getFileSize() + entrySize >= maxSegmentFileSize) {
isNeedNewSegmentFile = true;
// 最后一个segment的文件close并改名
segment.getRandomAccessFile().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ public class SnapshotDataFile {
}

private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class);
private String snapshotDir = RaftOptions.dataDir + File.separator + "snapshot";
private String snapshotDir;
private RaftMessage.SnapshotMetaData metaData;
// 表示是否正在安装snapshot,leader向follower安装,leader和follower同时处于installSnapshot状态
private AtomicBoolean isInstallSnapshot = new AtomicBoolean(false);
// 表示节点自己是否在对状态机做snapshot
private AtomicBoolean isTakeSnapshot = new AtomicBoolean(false);
private Lock lock = new ReentrantLock();

public Snapshot() {
public Snapshot(String raftDataDir) {
this.snapshotDir = raftDataDir + File.separator + "snapshot";
String snapshotDataDir = snapshotDir + File.separator + "data";
File file = new File(snapshotDataDir);
if (!file.exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.wenweihu86.raft.storage;

import com.github.wenweihu86.raft.RaftOptions;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
Expand All @@ -19,9 +18,8 @@ public class SegmentedLogTest {

@Test
public void testTruncateSuffix() throws IOException {
RaftOptions.dataDir = "./data";
RaftOptions.maxSegmentFileSize = 32;
SegmentedLog segmentedLog = new SegmentedLog();
String raftDataDir = "./data";
SegmentedLog segmentedLog = new SegmentedLog(raftDataDir, 32);
Assert.assertTrue(segmentedLog.getFirstLogIndex() == 1);

List<RaftMessage.LogEntry> entries = new ArrayList<>();
Expand All @@ -38,6 +36,6 @@ public void testTruncateSuffix() throws IOException {
Assert.assertTrue(lastLogIndex == 9);

segmentedLog.truncatePrefix(5);
FileUtils.deleteDirectory(new File(RaftOptions.dataDir));
FileUtils.deleteDirectory(new File(raftDataDir));
}
}
Loading

0 comments on commit 7e6d26b

Please sign in to comment.