Skip to content

Commit

Permalink
update rpc-java to brpc-java
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Aug 17, 2019
1 parent 8722482 commit bd706cf
Show file tree
Hide file tree
Showing 32 changed files with 6,303 additions and 6,840 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-all</artifactId>
<version>1.8.0</version>
<version>1.9.0</version>
<packaging>pom</packaging>

<name>raft-java-all</name>
<name>raft-java-parent</name>
<url>https://maven.apache.org</url>

<properties>
Expand Down
9 changes: 7 additions & 2 deletions raft-java-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-admin</artifactId>
<version>1.8.0</version>
<version>1.9.0</version>
<packaging>jar</packaging>

<name>raft-java-admin</name>
Expand Down Expand Up @@ -34,7 +34,12 @@
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.8.0</version>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.github.wenweihu86.raft.admin;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.proto.RaftProto;
import com.github.wenweihu86.raft.service.RaftClientService;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.googlecode.protobuf.format.JsonFormat;
import org.apache.commons.lang3.Validate;

import java.util.ArrayList;
Expand All @@ -13,7 +12,7 @@
* Created by wenweihu86 on 2017/5/14.
*/
public class AdminMain {
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();
private static final JsonFormat jsonFormat = new JsonFormat();

public static void main(String[] args) {
// parse args
Expand All @@ -31,33 +30,29 @@ public static void main(String[] args) {
|| subCmd.equals("del"));
RaftClientService client = new RaftClientServiceProxy(servers);
if (subCmd.equals("get")) {
RaftMessage.GetConfigurationRequest request = RaftMessage.GetConfigurationRequest.newBuilder().build();
RaftMessage.GetConfigurationResponse response = client.getConfiguration(request);
try {
if (response != null) {
System.out.println(PRINTER.print(response));
} else {
System.out.printf("response == null");
}
} catch (InvalidProtocolBufferException ex) {
ex.printStackTrace();
RaftProto.GetConfigurationRequest request = RaftProto.GetConfigurationRequest.newBuilder().build();
RaftProto.GetConfigurationResponse response = client.getConfiguration(request);
if (response != null) {
System.out.println(jsonFormat.printToString(response));
} else {
System.out.printf("response == null");
}

} else if (subCmd.equals("add")) {
List<RaftMessage.Server> serverList = parseServers(args[3]);
RaftMessage.AddPeersRequest request = RaftMessage.AddPeersRequest.newBuilder()
List<RaftProto.Server> serverList = parseServers(args[3]);
RaftProto.AddPeersRequest request = RaftProto.AddPeersRequest.newBuilder()
.addAllServers(serverList).build();
RaftMessage.AddPeersResponse response = client.addPeers(request);
RaftProto.AddPeersResponse response = client.addPeers(request);
if (response != null) {
System.out.println(response.getResCode());
} else {
System.out.printf("response == null");
}
} else if (subCmd.equals("del")) {
List<RaftMessage.Server> serverList = parseServers(args[3]);
RaftMessage.RemovePeersRequest request = RaftMessage.RemovePeersRequest.newBuilder()
List<RaftProto.Server> serverList = parseServers(args[3]);
RaftProto.RemovePeersRequest request = RaftProto.RemovePeersRequest.newBuilder()
.addAllServers(serverList).build();
RaftMessage.RemovePeersResponse response = client.removePeers(request);
RaftProto.RemovePeersResponse response = client.removePeers(request);
if (response != null) {
System.out.println(response.getResCode());
} else {
Expand All @@ -67,16 +62,16 @@ public static void main(String[] args) {
((RaftClientServiceProxy) client).stop();
}

public static List<RaftMessage.Server> parseServers(String serversString) {
List<RaftMessage.Server> serverList = new ArrayList<>();
public static List<RaftProto.Server> parseServers(String serversString) {
List<RaftProto.Server> serverList = new ArrayList<>();
String[] splitArray1 = serversString.split(",");
for (String addr : splitArray1) {
String[] splitArray2 = addr.split(":");
RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder()
RaftProto.Endpoint endPoint = RaftProto.Endpoint.newBuilder()
.setHost(splitArray2[0])
.setPort(Integer.parseInt(splitArray2[1])).build();
RaftMessage.Server server = RaftMessage.Server.newBuilder()
.setEndPoint(endPoint)
RaftProto.Server server = RaftProto.Server.newBuilder()
.setEndpoint(endPoint)
.setServerId(Integer.parseInt(splitArray2[2])).build();
serverList.add(server);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.github.wenweihu86.raft.admin;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.baidu.brpc.client.BrpcProxy;
import com.baidu.brpc.client.RpcClient;
import com.baidu.brpc.client.RpcClientOptions;
import com.baidu.brpc.client.instance.Endpoint;
import com.github.wenweihu86.raft.proto.RaftProto;
import com.github.wenweihu86.raft.service.RaftClientService;
import com.github.wenweihu86.rpc.client.EndPoint;
import com.github.wenweihu86.rpc.client.RPCClient;
import com.github.wenweihu86.rpc.client.RPCClientOptions;
import com.github.wenweihu86.rpc.client.RPCProxy;
import com.google.protobuf.util.JsonFormat;
import com.googlecode.protobuf.format.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,52 +18,52 @@
*/
public class RaftClientServiceProxy implements RaftClientService {
private static final Logger LOG = LoggerFactory.getLogger(RaftClientServiceProxy.class);
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();
private static final JsonFormat jsonFormat = new JsonFormat();

private List<RaftMessage.Server> cluster;
private RPCClient clusterRPCClient;
private List<RaftProto.Server> cluster;
private RpcClient clusterRPCClient;
private RaftClientService clusterRaftClientService;

private RaftMessage.Server leader;
private RPCClient leaderRPCClient;
private RaftProto.Server leader;
private RpcClient leaderRPCClient;
private RaftClientService leaderRaftClientService;

private RPCClientOptions rpcClientOptions = new RPCClientOptions();
private RpcClientOptions rpcClientOptions = new RpcClientOptions();

// servers format is 10.1.1.1:8888,10.2.2.2:9999
public RaftClientServiceProxy(String ipPorts) {
rpcClientOptions.setConnectTimeoutMillis(1000); // 1s
rpcClientOptions.setReadTimeoutMillis(3600000); // 1hour
rpcClientOptions.setWriteTimeoutMillis(1000); // 1s
clusterRPCClient = new RPCClient(ipPorts, rpcClientOptions);
clusterRaftClientService = RPCProxy.getProxy(clusterRPCClient, RaftClientService.class);
clusterRPCClient = new RpcClient(ipPorts, rpcClientOptions);
clusterRaftClientService = BrpcProxy.getProxy(clusterRPCClient, RaftClientService.class);
updateConfiguration();
}

@Override
public RaftMessage.GetLeaderResponse getLeader(RaftMessage.GetLeaderRequest request) {
public RaftProto.GetLeaderResponse getLeader(RaftProto.GetLeaderRequest request) {
return clusterRaftClientService.getLeader(request);
}

@Override
public RaftMessage.GetConfigurationResponse getConfiguration(RaftMessage.GetConfigurationRequest request) {
public RaftProto.GetConfigurationResponse getConfiguration(RaftProto.GetConfigurationRequest request) {
return clusterRaftClientService.getConfiguration(request);
}

@Override
public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request) {
RaftMessage.AddPeersResponse response = leaderRaftClientService.addPeers(request);
if (response != null && response.getResCode() == RaftMessage.ResCode.RES_CODE_NOT_LEADER) {
public RaftProto.AddPeersResponse addPeers(RaftProto.AddPeersRequest request) {
RaftProto.AddPeersResponse response = leaderRaftClientService.addPeers(request);
if (response != null && response.getResCode() == RaftProto.ResCode.RES_CODE_NOT_LEADER) {
updateConfiguration();
response = leaderRaftClientService.addPeers(request);
}
return response;
}

@Override
public RaftMessage.RemovePeersResponse removePeers(RaftMessage.RemovePeersRequest request) {
RaftMessage.RemovePeersResponse response = leaderRaftClientService.removePeers(request);
if (response != null && response.getResCode() == RaftMessage.ResCode.RES_CODE_NOT_LEADER) {
public RaftProto.RemovePeersResponse removePeers(RaftProto.RemovePeersRequest request) {
RaftProto.RemovePeersResponse response = leaderRaftClientService.removePeers(request);
if (response != null && response.getResCode() == RaftProto.ResCode.RES_CODE_NOT_LEADER) {
updateConfiguration();
response = leaderRaftClientService.removePeers(request);
}
Expand All @@ -80,22 +80,22 @@ public void stop() {
}

private boolean updateConfiguration() {
RaftMessage.GetConfigurationRequest request = RaftMessage.GetConfigurationRequest.newBuilder().build();
RaftMessage.GetConfigurationResponse response = clusterRaftClientService.getConfiguration(request);
if (response != null && response.getResCode() == RaftMessage.ResCode.RES_CODE_SUCCESS) {
RaftProto.GetConfigurationRequest request = RaftProto.GetConfigurationRequest.newBuilder().build();
RaftProto.GetConfigurationResponse response = clusterRaftClientService.getConfiguration(request);
if (response != null && response.getResCode() == RaftProto.ResCode.RES_CODE_SUCCESS) {
if (leaderRPCClient != null) {
leaderRPCClient.stop();
}
leader = response.getLeader();
leaderRPCClient = new RPCClient(convertEndPoint(leader.getEndPoint()), rpcClientOptions);
leaderRaftClientService = RPCProxy.getProxy(leaderRPCClient, RaftClientService.class);
leaderRPCClient = new RpcClient(convertEndPoint(leader.getEndpoint()), rpcClientOptions);
leaderRaftClientService = BrpcProxy.getProxy(leaderRPCClient, RaftClientService.class);
return true;
}
return false;
}

private EndPoint convertEndPoint(RaftMessage.EndPoint endPoint) {
return new EndPoint(endPoint.getHost(), endPoint.getPort());
private Endpoint convertEndPoint(RaftProto.Endpoint endPoint) {
return new Endpoint(endPoint.getHost(), endPoint.getPort());
}

}
8 changes: 4 additions & 4 deletions raft-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.8.0</version>
<version>1.9.0</version>
<packaging>jar</packaging>

<name>raft-java-core</name>
Expand Down Expand Up @@ -115,9 +115,9 @@

<dependencies>
<dependency>
<groupId>com.github.wenweihu86.rpc</groupId>
<artifactId>rpc-java</artifactId>
<version>1.8.0</version>
<groupId>com.baidu</groupId>
<artifactId>brpc-java</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
33 changes: 13 additions & 20 deletions raft-java-core/src/main/java/com/github/wenweihu86/raft/Peer.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.github.wenweihu86.raft;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.service.RaftConsensusService;
import com.baidu.brpc.client.BrpcProxy;
import com.baidu.brpc.client.RpcClient;
import com.baidu.brpc.client.instance.Endpoint;
import com.github.wenweihu86.raft.proto.RaftProto;
import com.github.wenweihu86.raft.service.RaftConsensusServiceAsync;
import com.github.wenweihu86.rpc.client.EndPoint;
import com.github.wenweihu86.rpc.client.RPCClient;
import com.github.wenweihu86.rpc.client.RPCProxy;

/**
* Created by wenweihu86 on 2017/5/5.
*/
public class Peer {
private RaftMessage.Server server;
private RPCClient rpcClient;
private RaftConsensusService raftConsensusService;
private RaftProto.Server server;
private RpcClient rpcClient;
private RaftConsensusServiceAsync raftConsensusServiceAsync;
// 需要发送给follower的下一个日志条目的索引值,只对leader有效
private long nextIndex;
Expand All @@ -22,28 +20,23 @@ public class Peer {
private volatile Boolean voteGranted;
private volatile boolean isCatchUp;

public Peer(RaftMessage.Server server) {
public Peer(RaftProto.Server server) {
this.server = server;
this.rpcClient = new RPCClient(new EndPoint(
server.getEndPoint().getHost(),
server.getEndPoint().getPort()));
raftConsensusService = RPCProxy.getProxy(rpcClient, RaftConsensusService.class);
raftConsensusServiceAsync = RPCProxy.getProxy(rpcClient, RaftConsensusServiceAsync.class);
this.rpcClient = new RpcClient(new Endpoint(
server.getEndpoint().getHost(),
server.getEndpoint().getPort()));
raftConsensusServiceAsync = BrpcProxy.getProxy(rpcClient, RaftConsensusServiceAsync.class);
isCatchUp = false;
}

public RaftMessage.Server getServer() {
public RaftProto.Server getServer() {
return server;
}

public RPCClient getRpcClient() {
public RpcClient getRpcClient() {
return rpcClient;
}

public RaftConsensusService getRaftConsensusService() {
return raftConsensusService;
}

public RaftConsensusServiceAsync getRaftConsensusServiceAsync() {
return raftConsensusServiceAsync;
}
Expand Down
Loading

0 comments on commit bd706cf

Please sign in to comment.