Skip to content

Commit

Permalink
Broker端增加注销Consumer功能
Browse files Browse the repository at this point in the history
  • Loading branch information
vintagewang committed May 19, 2013
1 parent 82d48a8 commit dc5ec5b
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel
}


public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel().id());
if (old != null) {
log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
}
}


public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
final ClientChannelInfo info = this.channelInfoTable.remove(channel.id());
if (info != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
boolean r2 = consumerGroupInfo.updateSubscription(subList);
return r1 || r2;
}


public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,66 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
log.error("", e);
}
}


public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
if (this.hashcodeChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
List<ClientChannelInfo> clientChannelInfoList =
this.hashcodeChannelTable.get(group.hashCode());
if (null != clientChannelInfoList && !clientChannelInfoList.isEmpty()) {
boolean result = clientChannelInfoList.remove(clientChannelInfo);
if (result) {
log.info("unregister a producer[{}] from hashcodeChannelTable {}", group,
clientChannelInfo.toString());
}

if (clientChannelInfoList.isEmpty()) {
this.hashcodeChannelTable.remove(group.hashCode());
log.info("unregister a producer group[{}] from hashcodeChannelTable", group);
}
}
}
finally {
this.hashcodeChannelLock.unlock();
}
}
else {
log.warn("ProducerManager unregisterProducer lock timeout");
}
}
catch (InterruptedException e) {
log.error("", e);
}

try {
if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
HashMap<Integer, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel().id());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
}

if (channelTable.isEmpty()) {
this.hashcodeChannelTable.remove(group.hashCode());
log.info("unregister a producer group[{}] from groupChannelTable", group);
}
}
}
finally {
this.groupChannelLock.unlock();
}
}
else {
log.warn("ProducerManager registerProducer lock timeout");
}
}
catch (InterruptedException e) {
log.error("", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.protocol.MQProtos.MQRequestCode;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData;
import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.remoting.protocol.RemotingProtos.ResponseCode;
Expand All @@ -40,20 +45,56 @@ public ClientManageProcessor(final BrokerController brokerController) {


@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
MQRequestCode code = MQRequestCode.valueOf(request.getCode());
switch (code) {
case HEART_BEAT:
return this.heartBeat(ctx, request);
case UNREGISTER_CLIENT:
break;
return this.unregisterClient(ctx, request);
default:
break;
}
return null;
}


public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
final UnregisterClientResponseHeader responseHeader =
(UnregisterClientResponseHeader) response.getCustomHeader();
final UnregisterClientRequestHeader requestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);

ClientChannelInfo clientChannelInfo = new ClientChannelInfo(//
ctx.channel(),//
requestHeader.getClientID(),//
request.getLanguage(),//
request.getVersion()//
);

// 注销Producer
final String producerGroup = requestHeader.getProducerGroup();
if (producerGroup != null) {
this.brokerController.getProducerManager().unregisterProducer(producerGroup, clientChannelInfo);
}

// 注销Consumer
final String consumerGroup = requestHeader.getProducerGroup();
if (consumerGroup != null) {
this.brokerController.getConsumerManager().unregisterConsumer(consumerGroup, clientChannelInfo);
}

response.setCode(ResponseCode.SUCCESS_VALUE);
response.setRemark(null);
return response;
}


public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
*
*/
package com.alibaba.rocketmq.common.protocol.header;

import com.alibaba.rocketmq.remoting.CommandCustomHeader;
import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
import com.alibaba.rocketmq.remoting.annotation.CFNullable;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;


/**
* @author [email protected] [email protected]
*/
public class UnregisterClientRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clientID;

@CFNullable
private String producerGroup;
@CFNullable
private String consumerGroup;


public String getClientID() {
return clientID;
}


public void setClientID(String clientID) {
this.clientID = clientID;
}


public String getProducerGroup() {
return producerGroup;
}


public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}


public String getConsumerGroup() {
return consumerGroup;
}


public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}


@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
*
*/
package com.alibaba.rocketmq.common.protocol.header;

import com.alibaba.rocketmq.remoting.CommandCustomHeader;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;

/**
* @author [email protected] [email protected]
*/
public class UnregisterClientResponseHeader implements CommandCustomHeader {

/* (non-Javadoc)
* @see com.alibaba.rocketmq.remoting.CommandCustomHeader#checkFields()
*/
@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub

}

}

0 comments on commit dc5ec5b

Please sign in to comment.