Skip to content

Commit

Permalink
#3 客户端增加请求处理器
Browse files Browse the repository at this point in the history
  • Loading branch information
vintagewang committed May 19, 2013
1 parent ee8efe5 commit 695185d
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void test_sendMessage() throws Exception {

brokerController.start();

MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig());
MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null);
client.start();

for (int i = 0; i < 100000; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
*
*/
package com.alibaba.rocketmq.client.impl;

import io.netty.channel.ChannelHandlerContext;

import org.slf4j.Logger;

import com.alibaba.rocketmq.client.impl.factory.MQClientFactory;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;


/**
* Client接收Broker的回调操作,例如事务回调,或者其他管理类命令回调
*
* @author [email protected] [email protected]
*/
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final Logger log;
private final MQClientFactory mqClientFactory;


public ClientRemotingProcessor(final MQClientFactory mqClientFactory, final Logger log) {
this.log = log;
this.mqClientFactory = mqClientFactory;
}


@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// TODO Auto-generated method stub
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,19 @@ public class MQClientAPIImpl {
private final TopAddressing topAddressing = new TopAddressing();
private String nameSrvAddr = null;

private final ClientRemotingProcessor clientRemotingProcessor;

static {
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
}


public MQClientAPIImpl(final NettyClientConfig nettyClientConfig) {
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerProcessor(MQRequestCode.CHECK_TRANSACTION_STATE_VALUE,
this.clientRemotingProcessor, null);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.rocketmq.client.consumer.ConsumeFromWhichNode;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.impl.ClientRemotingProcessor;
import com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.alibaba.rocketmq.client.impl.MQAdminImpl;
import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
Expand Down Expand Up @@ -59,19 +60,21 @@ public class MQClientFactory {
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();

// Producer对象
private final ConcurrentHashMap<String/* group */, DefaultMQProducerImpl> producerTable =
new ConcurrentHashMap<String, DefaultMQProducerImpl>();

// Consumer对象
private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable =
new ConcurrentHashMap<String, MQConsumerInner>();

// Netty客户端配置
private final NettyClientConfig nettyClientConfig;
// RPC调用的封装类
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;

// 存储从Name Server拿到的Topic路由信息
private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable =
new ConcurrentHashMap<String, TopicRouteData>();

// 调用Name Server获取Topic路由信息时,加锁
private final Lock lockNamesrv = new ReentrantLock();
private final static long LockTimeoutMillis = 3000;
Expand All @@ -80,35 +83,37 @@ public class MQClientFactory {
private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();

private ScheduledExecutorService scheduledExecutorService = Executors
// 定时线程
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});

private final MQAdminImpl mQAdminImpl;
private final ClientRemotingProcessor clientRemotingProcessor;


public MQClientFactory(MQClientConfig mQClientConfig, int factoryIndex) {
this.mQClientConfig = mQClientConfig;
this.log = MixAll.createLogger(mQClientConfig.getLogFileName(), mQClientConfig.getLogLevel());
this.factoryIndex = factoryIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(mQClientConfig.getClientCallbackExecutorThreads());
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig);
this.log = MixAll.createLogger(mQClientConfig.getLogFileName(), mQClientConfig.getLogLevel());
this.clientRemotingProcessor = new ClientRemotingProcessor(this, this.log);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor);

if (this.mQClientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.mQClientConfig.getNamesrvAddr());
log.info("user specfied name server address, " + this.mQClientConfig.getNamesrvAddr());
log.info("user specfied name server address: {}", this.mQClientConfig.getNamesrvAddr());
}

this.clientId = this.buildMQClientId();

this.mQAdminImpl = new MQAdminImpl(this);

log.info("created a new client fatory, " + this.factoryIndex);
log.info("created a new client fatory, ", this.factoryIndex);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Lock lockNamesrvChannel = new ReentrantLock();

// 处理Callback应答器
private final ExecutorService callbackExecutor;
private final ExecutorService publicExecutor;

private final ChannelEventListener channelEventListener;

Expand Down Expand Up @@ -200,24 +200,21 @@ public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;

if (nettyClientConfig.getClientCallbackExecutorThreads() > 0) {
this.callbackExecutor =
Executors.newFixedThreadPool(nettyClientConfig.getClientCallbackExecutorThreads(),
new ThreadFactory() {
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}

private AtomicInteger threadIndex = new AtomicInteger(0);
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);


@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientCallbackExecutor_"
+ this.threadIndex.incrementAndGet());
}
});
}
else {
this.callbackExecutor = null;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});

}


Expand Down Expand Up @@ -319,9 +316,9 @@ public void shutdown() {
log.error("NettyRemotingClient shutdown exception, ", e);
}

if (this.callbackExecutor != null) {
if (this.publicExecutor != null) {
try {
this.callbackExecutor.shutdown();
this.publicExecutor.shutdown();
}
catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
Expand Down Expand Up @@ -625,7 +622,7 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli

@Override
public Executor getCallbackExecutor() {
return this.callbackExecutor;
return this.publicExecutor;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private NettyServerConfig nettyServerConfig;

// ´¦ÀíCallbackÓ¦´ðÆ÷
private final ExecutorService callbackExecutor;
private final ExecutorService publicExecutor;

private final ChannelEventListener channelEventListener;

Expand Down Expand Up @@ -143,24 +143,20 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;

if (nettyServerConfig.getServerCallbackExecutorThreads() > 0) {
this.callbackExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerCallbackExecutorThreads(),
new ThreadFactory() {
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}

private AtomicInteger threadIndex = new AtomicInteger(0);
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);


@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCallbackExecutor_"
+ this.threadIndex.incrementAndGet());
}
});
}
else {
this.callbackExecutor = null;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
}


Expand Down Expand Up @@ -203,9 +199,9 @@ public void shutdown() {
log.error("NettyRemotingServer shutdown exception, ", e);
}

if (this.callbackExecutor != null) {
if (this.publicExecutor != null) {
try {
this.callbackExecutor.shutdown();
this.publicExecutor.shutdown();
}
catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
Expand Down Expand Up @@ -248,7 +244,7 @@ public void invokeOneway(Channel channel, RemotingCommand request, long timeoutM

@Override
public Executor getCallbackExecutor() {
return this.callbackExecutor;
return this.publicExecutor;
}


Expand Down

0 comments on commit 695185d

Please sign in to comment.