Skip to content

Commit

Permalink
update async call rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Oct 29, 2017
1 parent a1c43d4 commit 82b9829
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 4 deletions.
2 changes: 1 addition & 1 deletion raft-java-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<dependency>
<groupId>com.github.wenweihu86.rpc</groupId>
<artifactId>rpc-java</artifactId>
<version>1.6.0</version>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.service.RaftConsensusService;
import com.github.wenweihu86.raft.service.RaftConsensusServiceAsync;
import com.github.wenweihu86.rpc.client.EndPoint;
import com.github.wenweihu86.rpc.client.RPCClient;
import com.github.wenweihu86.rpc.client.RPCProxy;
Expand All @@ -13,6 +14,7 @@ public class Peer {
private RaftMessage.Server server;
private RPCClient rpcClient;
private RaftConsensusService raftConsensusService;
private RaftConsensusServiceAsync raftConsensusServiceAsync;
// 需要发送给follower的下一个日志条目的索引值,只对leader有效
private long nextIndex;
// 已复制日志的最高索引值
Expand All @@ -26,6 +28,7 @@ public Peer(RaftMessage.Server server) {
server.getEndPoint().getHost(),
server.getEndPoint().getPort()));
raftConsensusService = RPCProxy.getProxy(rpcClient, RaftConsensusService.class);
raftConsensusServiceAsync = RPCProxy.getProxy(rpcClient, RaftConsensusServiceAsync.class);
isCatchUp = false;
}

Expand All @@ -41,6 +44,10 @@ public RaftConsensusService getRaftConsensusService() {
return raftConsensusService;
}

public RaftConsensusServiceAsync getRaftConsensusServiceAsync() {
return raftConsensusServiceAsync;
}

public long getNextIndex() {
return nextIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,8 @@ private void requestVote(Peer peer) {
}

RaftMessage.VoteRequest request = requestBuilder.build();
peer.getRpcClient().asyncCall(
"RaftConsensusService.requestVote", request,
new VoteResponseCallback(peer, request));
peer.getRaftConsensusServiceAsync().requestVote(
request, new VoteResponseCallback(peer, request));
}

private class VoteResponseCallback implements RPCCallback<RaftMessage.VoteResponse> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.wenweihu86.raft.service;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.rpc.client.RPCCallback;

import java.util.concurrent.Future;

/**
* 用于生成client异步调用所需的proxy
* Created by wenweihu86 on 2017/5/14.
*/
public interface RaftClientServiceAsync extends RaftClientService {

Future<RaftMessage.GetLeaderResponse> getLeader(
RaftMessage.GetLeaderRequest request,
RPCCallback<RaftMessage.GetLeaderResponse> callback);

Future<RaftMessage.GetConfigurationResponse> getConfiguration(
RaftMessage.GetConfigurationRequest request,
RPCCallback<RaftMessage.GetConfigurationResponse> callback);

Future<RaftMessage.AddPeersResponse> addPeers(
RaftMessage.AddPeersRequest request,
RPCCallback<RaftMessage.AddPeersResponse> callback);

Future<RaftMessage.RemovePeersResponse> removePeers(
RaftMessage.RemovePeersRequest request,
RPCCallback<RaftMessage.RemovePeersResponse> callback);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.wenweihu86.raft.service;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.rpc.client.RPCCallback;

import java.util.concurrent.Future;

/**
* 用于生成client异步调用所需的proxy
* Created by wenweihu86 on 2017/5/2.
*/
public interface RaftConsensusServiceAsync extends RaftConsensusService {

Future<RaftMessage.VoteResponse> requestVote(
RaftMessage.VoteRequest request,
RPCCallback<RaftMessage.VoteResponse> callback);

Future<RaftMessage.AppendEntriesResponse> appendEntries(
RaftMessage.AppendEntriesRequest request,
RPCCallback<RaftMessage.AppendEntriesResponse> callback);

Future<RaftMessage.InstallSnapshotResponse> installSnapshot(
RaftMessage.InstallSnapshotRequest request,
RPCCallback<RaftMessage.InstallSnapshotResponse> callback);
}

0 comments on commit 82b9829

Please sign in to comment.