Skip to content

Commit

Permalink
[ISSUE #68]Use multithreading for topic data collection in collectTask (
Browse files Browse the repository at this point in the history
#69)

* [ISSUE #68]Use multithreading for topic data collection in collectTask

* modify ut

* Optimize exception log printing

Co-authored-by: zhangjidi <[email protected]>
  • Loading branch information
zhangjidi2016 and zhangjidi committed Feb 11, 2022
1 parent d2da9ca commit 653332d
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.dashboard.admin;

import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
Expand All @@ -33,6 +34,8 @@ public MQAdminFactory(RMQConfigure rmqConfigure) {
this.rmqConfigure = rmqConfigure;
}

private final AtomicLong adminIndex = new AtomicLong(0);

public MQAdminExt getInstance() throws Exception {
RPCHook rpcHook = null;
final String accessKey = rmqConfigure.getAccessKey();
Expand All @@ -47,6 +50,7 @@ public MQAdminExt getInstance() throws Exception {
} else {
mqAdminExt = new DefaultMQAdminExt(rpcHook, rmqConfigure.getTimeoutMillis());
}
mqAdminExt.setAdminExtGroup(mqAdminExt.getAdminExtGroup() + "_" + adminIndex.getAndIncrement());
mqAdminExt.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel()));
mqAdminExt.setUseTLS(rmqConfigure.isUseTLS());
mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.dashboard.config;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "threadpool.config")
@Data
public class CollectExecutorConfig {
private int coreSize = 20;
private int maxSize = 20;
private long keepAliveTime = 3000L;
private int queueSize = 1000;

@Bean(name = "collectExecutor")
public ExecutorService collectExecutor(CollectExecutorConfig collectExecutorConfig) {
ExecutorService collectExecutor = new ThreadPoolExecutor(
collectExecutorConfig.getCoreSize(),
collectExecutorConfig.getMaxSize(),
collectExecutorConfig.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(collectExecutorConfig.getQueueSize()),
new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "collectTopicThread_" + this.threadIndex.incrementAndGet());
}
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return collectExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.dashboard.task;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;

@Slf4j
public class CollectTaskRunnble implements Runnable {

private String topic;

private MQAdminExt mqAdminExt;

private DashboardCollectService dashboardCollectService;

public CollectTaskRunnble(String topic, MQAdminExt mqAdminExt,
DashboardCollectService dashboardCollectService) {
this.topic = topic;
this.mqAdminExt = mqAdminExt;
this.dashboardCollectService = dashboardCollectService;
}

@Override
public void run() {
Date date = new Date();
try {
TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);
double inTPS = 0;
long inMsgCntToday = 0;
double outTPS = 0;
long outMsgCntToday = 0;
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
try {
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
inTPS += bsd.getStatsMinute().getTps();
inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
} catch (Exception e) {
log.warn("Exception caught: mqAdminExt get broker stats data TOPIC_PUT_NUMS failed", e.getMessage());
}
}
}
if (groupList != null && !groupList.getGroupList().isEmpty()) {
for (String group : groupList.getGroupList()) {
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
try {
String statsKey = String.format("%s@%s", topic, group);
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
outTPS += bsd.getStatsMinute().getTps();
outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
} catch (Exception e) {
log.warn("Exception caught: mqAdminExt get broker stats data GROUP_GET_NUMS failed", e.getMessage());
}
}
}
}
}

List<String> list;
try {
list = dashboardCollectService.getTopicMap().get(topic);
} catch (ExecutionException e) {
throw Throwables.propagate(e);
}
if (null == list) {
list = Lists.newArrayList();
}

list.add(date.getTime() + "," + new BigDecimal(inTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + inMsgCntToday + "," + new BigDecimal(outTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + outMsgCntToday);
dashboardCollectService.getTopicMap().put(topic, list);
} catch (Exception e) {
log.error("Failed to collect topic: {} data", topic, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@
*/
package org.apache.rocketmq.dashboard.task;

import com.google.common.base.Stopwatch;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import com.google.common.base.Throwables;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
Expand All @@ -42,13 +31,18 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.service.DashboardCollectService;
import org.apache.rocketmq.dashboard.util.JsonUtil;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -67,14 +61,14 @@ public class DashboardCollectTask {

private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class);

@Resource
private ExecutorService collectExecutor;

@Scheduled(cron = "30 0/1 * * * ?")
public void collectTopic() {
if (!rmqConfigure.isEnableDashBoardCollect()) {
return;
}

Date date = new Date();
Stopwatch stopwatch = Stopwatch.createUnstarted();
try {
TopicList topicList = mqAdminExt.fetchAllTopicList();
Set<String> topicSet = topicList.getTopicList();
Expand All @@ -85,77 +79,9 @@ public void collectTopic() {
|| TopicValidator.isSystemTopic(topic)) {
continue;
}
TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);

GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic);

double inTPS = 0;

long inMsgCntToday = 0;

double outTPS = 0;

long outMsgCntToday = 0;

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
try {
stopwatch.start();
log.info("start time: {}", stopwatch.toString());
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
stopwatch.stop();
log.info("stop time : {}", stopwatch.toString());

inTPS += bsd.getStatsMinute().getTps();
inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
}
catch (Exception e) {
stopwatch.reset();
log.warn("Exception caught: mqAdminExt get broker stats data TOPIC_PUT_NUMS failed");
log.warn("Response [{}] ", e.getMessage());
}
}
}

if (groupList != null && !groupList.getGroupList().isEmpty()) {

for (String group : groupList.getGroupList()) {
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
try {
String statsKey = String.format("%s@%s", topic, group);
BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
outTPS += bsd.getStatsMinute().getTps();
outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd);
}
catch (Exception e) {
log.warn("Exception caught: mqAdminExt get broker stats data GROUP_GET_NUMS failed");
log.warn("Response [{}] ", e.getMessage());
}
}
}
}
}

List<String> list;
try {
list = dashboardCollectService.getTopicMap().get(topic);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
if (null == list) {
list = Lists.newArrayList();
}

list.add(date.getTime() + "," + new BigDecimal(inTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + inMsgCntToday + "," + new BigDecimal(outTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + outMsgCntToday);
dashboardCollectService.getTopicMap().put(topic, list);

CollectTaskRunnble collectTask = new CollectTaskRunnble(topic, mqAdminExt, dashboardCollectService);
collectExecutor.submit(collectTask);
}

log.debug("Topic Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getTopicMap().asMap()));
}
catch (Exception err) {
throw Throwables.propagate(err);
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ rocketmq:
accessKey: # if version > 4.4.0
secretKey: # if version > 4.4.0

threadpool:
config:
coreSize: 10
maxSize: 10
keepAliveTime: 3000
queueSize: 5000
4 changes: 2 additions & 2 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder charset="UTF-8">
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %p %t - %m%n</pattern>
</encoder>
</appender>

Expand All @@ -37,7 +37,7 @@
<MaxHistory>10</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern>
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
Expand Down
Loading

0 comments on commit 653332d

Please sign in to comment.