Skip to content

Commit

Permalink
add crc32 and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 6, 2017
1 parent 4e9bb50 commit fd506e3
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 11 deletions.
102 changes: 101 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,109 @@
# raft-java
Just another Raft implementation for Java.<br>
参考自[raft论文](https://github.com/maemual/raft-zh_cn)和raft作者的开源raft实现[logcabin](https://github.com/logcabin/logcabin)
参考自[Raft论文](https://github.com/maemual/raft-zh_cn)和Raft作者的开源实现[LogCabin](https://github.com/logcabin/logcabin)

# 支持的功能
* leader选举
* 日志复制
* snapshot
* 集群成员动态更变

# 使用方法
## 配置依赖
```
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.0.0</version>
</dependency>
```

## 定义数据写入和读取接口
```protobuf
message SetRequest {
string key = 1;
string value = 2;
}
message SetResponse {
bool success = 1;
}
message GetRequest {
string key = 1;
}
message GetResponse {
string value = 1;
}
```
```java
public interface ExampleService {
Example.SetResponse set(Example.SetRequest request);
Example.GetResponse get(Example.GetRequest request);
}
```

## 服务端使用方法
1. 实现状态机StateMachine接口实现类
```java
// 该接口三个方法主要是给Raft内部调用
public interface StateMachine {
/**
* 对状态机中数据进行snapshot,每个节点本地定时调用
* @param snapshotDir snapshot数据输出目录
*/
void writeSnapshot(String snapshotDir);
/**
* 读取snapshot到状态机,节点启动时调用
* @param snapshotDir snapshot数据目录
*/
void readSnapshot(String snapshotDir);
/**
* 将数据应用到状态机
* @param dataBytes 数据二进制
*/
void apply(byte[] dataBytes);
}
```

2. 实现数据写入和读取接口
```
// ExampleService实现类中需要包含以下成员
private RaftNode raftNode;
private ExampleStateMachine stateMachine;
```
```
// 数据写入主要逻辑
byte[] data = request.toByteArray();
// 数据同步写入raft集群
boolean success = raftNode.replicate(data, Raft.EntryType.ENTRY_TYPE_DATA);
Example.SetResponse response = Example.SetResponse.newBuilder().setSuccess(success).build();
```
```
// 数据读取主要逻辑,由具体应用状态机实现
Example.GetResponse response = stateMachine.get(request);
```

3. 服务端启动逻辑
```
// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 应用状态机
ExampleStateMachine stateMachine = new ExampleStateMachine();
// 设置Raft选项,比如:
RaftOptions.snapshotMinLogSize = 10 * 1024;
RaftOptions.snapshotPeriodSeconds = 30;
RaftOptions.maxSegmentFileSize = 1024 * 1024;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(serverList, localServer, stateMachine);
// 注册Raft节点之间相互调用的服务
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);
// 注册给Client调用的Raft服务
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注册应用自己提供的服务
ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine);
server.registerService(exampleService);
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>pom</packaging>

<name>raft-java-parent</name>
Expand Down
2 changes: 1 addition & 1 deletion raft-java-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-admin</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>raft-java-admin</name>
Expand Down
86 changes: 85 additions & 1 deletion raft-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,100 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>raft-java-core</name>
<url>http:https://maven.apache.org</url>
<description>another Raft implementation for Java</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http:https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<developers>
<developer>
<name>wenweihu86</name>
<email>[email protected]</email>
</developer>
</developers>

<scm>
<connection>scm:git:https://github.com/wenweihu86/rpc-java</connection>
<developerConnection>scm:git:[email protected]:wenweihu86/rpc-java.git</developerConnection>
<url>https://github.com/wenweihu86/rpc-java</url>
<tag>1.0.0</tag>
</scm>

<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<!-- Source -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Javadoc -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- GPG -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<snapshotRepository>
<id>snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</snapshotRepository>
<repository>
<id>releases</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>
</profile>
</profiles>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package com.github.wenweihu86.raft;

/**
* Raft状态机接口类
* Created by wenweihu86 on 2017/5/10.
*/
public interface StateMachine {

/**
* 对状态机中数据进行snapshot,每个节点本地定时调用
* @param snapshotDir snapshot数据输出目录
*/
void writeSnapshot(String snapshotDir);

/**
* 读取snapshot到状态机,节点启动时调用
* @param snapshotDir snapshot数据目录
*/
void readSnapshot(String snapshotDir);

/**
* 将数据应用到状态机
* @param dataBytes 数据二进制
*/
void apply(byte[] dataBytes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ public void loadSegmentData(Segment segment) {
long offset = 0;
while (offset < totalLength) {
Raft.LogEntry entry = RaftFileUtils.readProtoFromFile(randomAccessFile, Raft.LogEntry.class);
if (entry == null) {
throw new RuntimeException("read segment log failed");
}
Segment.Record record = new Segment.Record(offset, entry);
segment.getEntries().add(record);
offset = randomAccessFile.getFilePointer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.zip.CRC32;

/**
* Created by wenweihu86 on 2017/5/6.
Expand Down Expand Up @@ -85,8 +86,9 @@ public static void closeFile(FileOutputStream outputStream) {

public static <T extends GeneratedMessageV3> T readProtoFromFile(RandomAccessFile raf, Class<T> clazz) {
try {
long crc32FromFile = raf.readLong();
int dataLen = raf.readInt();
int hasReadLen = Integer.SIZE / Byte.SIZE;
int hasReadLen = (Long.SIZE + Integer.SIZE) / Byte.SIZE;
if (raf.length() - hasReadLen < dataLen) {
LOG.warn("file remainLength < dataLen");
return null;
Expand All @@ -97,6 +99,11 @@ public static <T extends GeneratedMessageV3> T readProtoFromFile(RandomAccessFil
LOG.warn("readLen != dataLen");
return null;
}
long crc32FromData = getCRC32(data);
if (crc32FromFile != crc32FromData) {
LOG.warn("crc32 check failed");
return null;
}
Method method = clazz.getMethod("parseFrom", byte[].class);
T message = (T) method.invoke(clazz, data);
return message;
Expand All @@ -108,7 +115,9 @@ public static <T extends GeneratedMessageV3> T readProtoFromFile(RandomAccessFil

public static <T extends GeneratedMessageV3> void writeProtoToFile(RandomAccessFile raf, T message) {
byte[] messageBytes = message.toByteArray();
long crc32 = getCRC32(messageBytes);
try {
raf.writeLong(crc32);
raf.writeInt(messageBytes.length);
raf.write(messageBytes);
} catch (IOException ex) {
Expand All @@ -117,4 +126,10 @@ public static <T extends GeneratedMessageV3> void writeProtoToFile(RandomAccess
}
}

public static long getCRC32(byte[] data) {
CRC32 crc32 = new CRC32();
crc32.update(data);
return crc32.getValue();
}

}
2 changes: 1 addition & 1 deletion raft-java-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-example</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>raft-java-example</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,27 @@ public static void main(String[] args) {
// local server
Raft.Server localServer = parseServer(args[1]);

// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());

// 应用状态机
ExampleStateMachine stateMachine = new ExampleStateMachine();
// 设置Raft选项,比如:
// just for test snapshot
RaftOptions.snapshotMinLogSize = 10 * 1024;
RaftOptions.snapshotPeriodSeconds = 30;
RaftOptions.maxSegmentFileSize = 1024 * 1024;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(serverList, localServer, stateMachine);

// 注册Raft节点之间相互调用的服务
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);

// 注册给Client调用的Raft服务
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);

// 注册应用自己提供的服务
ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine);
server.registerService(exampleService);

// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public ExampleServiceImpl(RaftNode raftNode, ExampleStateMachine stateMachine) {
@Override
public Example.SetResponse set(Example.SetRequest request) {
byte[] data = request.toByteArray();
// 数据同步写入raft集群
boolean success = raftNode.replicate(data, Raft.EntryType.ENTRY_TYPE_DATA);
Example.SetResponse response = Example.SetResponse.newBuilder()
.setSuccess(success).build();
Expand Down

0 comments on commit fd506e3

Please sign in to comment.