Skip to content

Commit

Permalink
Merge pull request #205 from WeBankFinTech/1.2.0
Browse files Browse the repository at this point in the history
merge branch 1.2.0 to develop branch
  • Loading branch information
keranbingaa committed Jan 27, 2021
2 parents 3959ea4 + 080b57f commit 2643c9e
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,5 @@ public interface MeshMQPushConsumer extends PushConsumer {

void pause();

void setInstanceName(String instanceName);

AbstractContext getContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public synchronized void init(KeyValue keyValue) throws Exception {
boolean isBroadcast = Boolean.valueOf(keyValue.getString("isBroadcast"));
String consumerGroup = keyValue.getString("consumerGroup");
String proxyIDC = keyValue.getString("proxyIDC");
String instanceName = keyValue.getString("instanceName");

DeFiBusClientConfig wcc = new DeFiBusClientConfig();
wcc.setNamesrvAddr(clientConfiguration.namesrvAddr);
Expand All @@ -100,6 +101,7 @@ public synchronized void init(KeyValue keyValue) throws Exception {

deFiBusPushConsumer = new DeFiBusPushConsumer(wcc);
deFiBusPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
deFiBusPushConsumer.getDefaultMQPushConsumer().setInstanceName(instanceName);
if (isBroadcast) {
deFiBusPushConsumer.getDefaultMQPushConsumer().setMessageModel(MessageModel.BROADCASTING);

Expand Down Expand Up @@ -274,11 +276,6 @@ public synchronized void shutdown() {
deFiBusPushConsumer.shutdown();
}

@Override
public void setInstanceName(String instanceName) {
deFiBusPushConsumer.getDefaultMQPushConsumer().setInstanceName(instanceName);
}

@Override
public AbstractContext getContext() {
return this.context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer.setMessageModel(MessageModel.valueOf(clientConfig.getMessageModel()));

String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
//this.rocketmqPushConsumer.setInstanceName(consumerId);
this.rocketmqPushConsumer.setInstanceName(properties.getString("instanceName"));
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public synchronized void init(KeyValue keyValue) throws Exception {
clientConfiguration.init();
boolean isBroadcast = Boolean.valueOf(keyValue.getString("isBroadcast"));
String consumerGroup = keyValue.getString("consumerGroup");
String instanceName = keyValue.getString("instanceName");

if(isBroadcast){
consumerGroup = Constants.CONSUMER_GROUP_NAME_PREFIX + Constants.BROADCAST_PREFIX + consumerGroup;
Expand All @@ -72,12 +73,14 @@ public synchronized void init(KeyValue keyValue) throws Exception {

properties.put("ACCESS_POINTS", omsNamesrv)
.put("REGION", "namespace")
.put(OMSBuiltinKeys.CONSUMER_ID, consumerGroup);
.put(OMSBuiltinKeys.CONSUMER_ID, consumerGroup)
.put("instanceName", instanceName);
if (isBroadcast){
properties.put("MESSAGE_MODEL", MessageModel.BROADCASTING.name());
}else {
properties.put("MESSAGE_MODEL", MessageModel.CLUSTERING.name());
}

MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint(omsNamesrv, properties);
pushConsumer = (PushConsumerImpl)messagingAccessPoint.createPushConsumer();
}
Expand All @@ -87,11 +90,6 @@ public void subscribe(String topic, MessageListener listener) throws Exception {
pushConsumer.attachQueue(topic, listener);
}

@Override
public void setInstanceName(String instanceName) {
pushConsumer.getRocketmqPushConsumer().setInstanceName(instanceName);
}

@Override
public AbstractContext getContext() {
return pushConsumer.getContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.webank.eventmesh.common.config.CommonConfiguration;
import com.webank.eventmesh.common.config.ConfigurationWraper;
import org.apache.commons.lang3.StringUtils;
import com.webank.eventmesh.common.config.CommonConfiguration;
import com.webank.eventmesh.common.config.ConfigurationWraper;

public class AccessConfiguration extends CommonConfiguration {
public int proxyTcpServerPort = 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.webank.eventmesh.common.config.CommonConfiguration;
import com.webank.eventmesh.common.config.ConfigurationWraper;
import org.apache.commons.lang3.StringUtils;
import com.webank.eventmesh.common.config.CommonConfiguration;
import com.webank.eventmesh.common.config.ConfigurationWraper;

public class ProxyConfiguration extends CommonConfiguration {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ public class MQConsumerWrapper extends MQWrapper {

protected MeshMQPushConsumer meshMQPushConsumer;

public void setInstanceName(String instanceName) {

meshMQPushConsumer.setInstanceName(instanceName);
}

public void subscribe(String topic, MessageListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ public synchronized void init() throws Exception {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
keyValue.put("proxyIDC", proxyHTTPServer.getProxyConfiguration().proxyIDC);
keyValue.put("instanceName", ProxyUtil.buildProxyClientID(consumerGroupConf.getConsumerGroup(),
proxyHTTPServer.getProxyConfiguration().proxyRegion,
proxyHTTPServer.getProxyConfiguration().proxyCluster));
persistentMqConsumer.init(keyValue);

//
KeyValue broadcastKeyValue = OMS.newKeyValue();
broadcastKeyValue.put("isBroadcast", "true");
broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
broadcastKeyValue.put("proxyIDC", proxyHTTPServer.getProxyConfiguration().proxyIDC);
broadcastMqConsumer.init(broadcastKeyValue);
broadcastMqConsumer.setInstanceName(ProxyUtil.buildProxyClientID(consumerGroupConf.getConsumerGroup(),
proxyHTTPServer.getProxyConfiguration().proxyRegion,
proxyHTTPServer.getProxyConfiguration().proxyCluster));
persistentMqConsumer.setInstanceName(ProxyUtil.buildProxyClientID(consumerGroupConf.getConsumerGroup(),
broadcastKeyValue.put("instanceName", ProxyUtil.buildProxyClientID(consumerGroupConf.getConsumerGroup(),
proxyHTTPServer.getProxyConfiguration().proxyRegion,
proxyHTTPServer.getProxyConfiguration().proxyCluster));
broadcastMqConsumer.init(broadcastKeyValue);
inited4Persistent.compareAndSet(false, true);
inited4Broadcast.compareAndSet(false, true);
logger.info("ProxyConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", groupName);
keyValue.put("proxyIDC", accessConfiguration.proxyIDC);
keyValue.put("instanceName", ProxyUtil.buildProxyTcpClientID(sysId, dcn, "SUB", accessConfiguration.proxyCluster));

persistentMsgConsumer.init(keyValue);
persistentMsgConsumer.setInstanceName(ProxyUtil.buildProxyTcpClientID(sysId, dcn, "SUB", accessConfiguration.proxyCluster));
// persistentMsgConsumer.registerMessageListener(new ProxyMessageListenerConcurrently() {
//
// @Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ProxyTcpSendResult send(Header header, Message msg, SendCallback sendCall
upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
session.getClientGroupWrapper().get().request(upStreamMsgContext, sendCallback, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
} else if (Command.RESPONSE_TO_SERVER == cmd) {
String cluster = msg.sysHeaders().getString(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
String cluster = msg.userHeaders().getString(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
if (!StringUtils.isEmpty(cluster)) {
String replyTopic = DeFiBusConstant.RR_REPLY_TOPIC;
replyTopic = cluster + "-" + replyTopic;
Expand Down

0 comments on commit 2643c9e

Please sign in to comment.