From 653332d488fa607d9ddfe3c25ea32fff7e4d627d Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Fri, 11 Feb 2022 09:27:53 +0800 Subject: [PATCH] [ISSUE #68]Use multithreading for topic data collection in collectTask (#69) * [ISSUE #68]Use multithreading for topic data collection in collectTask * modify ut * Optimize exception log printing Co-authored-by: zhangjidi --- .../dashboard/admin/MQAdminFactory.java | 4 + .../config/CollectExecutorConfig.java | 60 ++++++++++ .../dashboard/task/CollectTaskRunnble.java | 108 ++++++++++++++++++ .../dashboard/task/DashboardCollectTask.java | 98 ++-------------- src/main/resources/application.yml | 6 + src/main/resources/logback.xml | 4 +- .../config/CollectExecutorConfigTest.java | 50 ++++++++ .../task/DashboardCollectTaskTest.java | 35 +++++- 8 files changed, 271 insertions(+), 94 deletions(-) create mode 100644 src/main/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfig.java create mode 100644 src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java create mode 100644 src/test/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfigTest.java diff --git a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java index 8bbf19d5..e5e90c73 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java +++ b/src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.dashboard.admin; +import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; @@ -33,6 +34,8 @@ public MQAdminFactory(RMQConfigure rmqConfigure) { this.rmqConfigure = rmqConfigure; } + private final AtomicLong adminIndex = new AtomicLong(0); + public MQAdminExt getInstance() throws Exception { RPCHook rpcHook = null; final String accessKey = rmqConfigure.getAccessKey(); @@ -47,6 +50,7 @@ public MQAdminExt getInstance() throws Exception { } else { mqAdminExt = new DefaultMQAdminExt(rpcHook, rmqConfigure.getTimeoutMillis()); } + mqAdminExt.setAdminExtGroup(mqAdminExt.getAdminExtGroup() + "_" + adminIndex.getAndIncrement()); mqAdminExt.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel())); mqAdminExt.setUseTLS(rmqConfigure.isUseTLS()); mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfig.java b/src/main/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfig.java new file mode 100644 index 00000000..510ab8e8 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.dashboard.config; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "threadpool.config") +@Data +public class CollectExecutorConfig { + private int coreSize = 20; + private int maxSize = 20; + private long keepAliveTime = 3000L; + private int queueSize = 1000; + + @Bean(name = "collectExecutor") + public ExecutorService collectExecutor(CollectExecutorConfig collectExecutorConfig) { + ExecutorService collectExecutor = new ThreadPoolExecutor( + collectExecutorConfig.getCoreSize(), + collectExecutorConfig.getMaxSize(), + collectExecutorConfig.getKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(collectExecutorConfig.getQueueSize()), + new ThreadFactory() { + private final AtomicLong threadIndex = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "collectTopicThread_" + this.threadIndex.incrementAndGet()); + } + }, + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return collectExecutor; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java new file mode 100644 index 00000000..70856200 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/task/CollectTaskRunnble.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.task; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.dashboard.service.DashboardCollectService; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; + +@Slf4j +public class CollectTaskRunnble implements Runnable { + + private String topic; + + private MQAdminExt mqAdminExt; + + private DashboardCollectService dashboardCollectService; + + public CollectTaskRunnble(String topic, MQAdminExt mqAdminExt, + DashboardCollectService dashboardCollectService) { + this.topic = topic; + this.mqAdminExt = mqAdminExt; + this.dashboardCollectService = dashboardCollectService; + } + + @Override + public void run() { + Date date = new Date(); + try { + TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic); + GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); + double inTPS = 0; + long inMsgCntToday = 0; + double outTPS = 0; + long outMsgCntToday = 0; + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic); + inTPS += bsd.getStatsMinute().getTps(); + inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); + } catch (Exception e) { + log.warn("Exception caught: mqAdminExt get broker stats data TOPIC_PUT_NUMS failed", e.getMessage()); + } + } + } + if (groupList != null && !groupList.getGroupList().isEmpty()) { + for (String group : groupList.getGroupList()) { + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + String statsKey = String.format("%s@%s", topic, group); + BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey); + outTPS += bsd.getStatsMinute().getTps(); + outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); + } catch (Exception e) { + log.warn("Exception caught: mqAdminExt get broker stats data GROUP_GET_NUMS failed", e.getMessage()); + } + } + } + } + } + + List list; + try { + list = dashboardCollectService.getTopicMap().get(topic); + } catch (ExecutionException e) { + throw Throwables.propagate(e); + } + if (null == list) { + list = Lists.newArrayList(); + } + + list.add(date.getTime() + "," + new BigDecimal(inTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + inMsgCntToday + "," + new BigDecimal(outTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + outMsgCntToday); + dashboardCollectService.getTopicMap().put(topic, list); + } catch (Exception e) { + log.error("Failed to collect topic: {} data", topic, e); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java index c09568d7..70a558c5 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java @@ -16,17 +16,6 @@ */ package org.apache.rocketmq.dashboard.task; -import com.google.common.base.Stopwatch; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.apache.rocketmq.tools.admin.MQAdminExt; -import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import com.google.common.base.Throwables; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; @@ -42,13 +31,18 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import javax.annotation.Resource; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.service.DashboardCollectService; import org.apache.rocketmq.dashboard.util.JsonUtil; +import org.apache.rocketmq.tools.admin.MQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -67,14 +61,14 @@ public class DashboardCollectTask { private final static Logger log = LoggerFactory.getLogger(DashboardCollectTask.class); + @Resource + private ExecutorService collectExecutor; + @Scheduled(cron = "30 0/1 * * * ?") public void collectTopic() { if (!rmqConfigure.isEnableDashBoardCollect()) { return; } - - Date date = new Date(); - Stopwatch stopwatch = Stopwatch.createUnstarted(); try { TopicList topicList = mqAdminExt.fetchAllTopicList(); Set topicSet = topicList.getTopicList(); @@ -85,77 +79,9 @@ public void collectTopic() { || TopicValidator.isSystemTopic(topic)) { continue; } - TopicRouteData topicRouteData = mqAdminExt.examineTopicRouteInfo(topic); - - GroupList groupList = mqAdminExt.queryTopicConsumeByWho(topic); - - double inTPS = 0; - - long inMsgCntToday = 0; - - double outTPS = 0; - - long outMsgCntToday = 0; - - for (BrokerData bd : topicRouteData.getBrokerDatas()) { - String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); - if (masterAddr != null) { - try { - stopwatch.start(); - log.info("start time: {}", stopwatch.toString()); - BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic); - stopwatch.stop(); - log.info("stop time : {}", stopwatch.toString()); - - inTPS += bsd.getStatsMinute().getTps(); - inMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); - } - catch (Exception e) { - stopwatch.reset(); - log.warn("Exception caught: mqAdminExt get broker stats data TOPIC_PUT_NUMS failed"); - log.warn("Response [{}] ", e.getMessage()); - } - } - } - - if (groupList != null && !groupList.getGroupList().isEmpty()) { - - for (String group : groupList.getGroupList()) { - for (BrokerData bd : topicRouteData.getBrokerDatas()) { - String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); - if (masterAddr != null) { - try { - String statsKey = String.format("%s@%s", topic, group); - BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey); - outTPS += bsd.getStatsMinute().getTps(); - outMsgCntToday += StatsAllSubCommand.compute24HourSum(bsd); - } - catch (Exception e) { - log.warn("Exception caught: mqAdminExt get broker stats data GROUP_GET_NUMS failed"); - log.warn("Response [{}] ", e.getMessage()); - } - } - } - } - } - - List list; - try { - list = dashboardCollectService.getTopicMap().get(topic); - } - catch (ExecutionException e) { - throw Throwables.propagate(e); - } - if (null == list) { - list = Lists.newArrayList(); - } - - list.add(date.getTime() + "," + new BigDecimal(inTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + inMsgCntToday + "," + new BigDecimal(outTPS).setScale(5, BigDecimal.ROUND_HALF_UP) + "," + outMsgCntToday); - dashboardCollectService.getTopicMap().put(topic, list); - + CollectTaskRunnble collectTask = new CollectTaskRunnble(topic, mqAdminExt, dashboardCollectService); + collectExecutor.submit(collectTask); } - - log.debug("Topic Collected Data in memory = {}" + JsonUtil.obj2String(dashboardCollectService.getTopicMap().asMap())); } catch (Exception err) { throw Throwables.propagate(err); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4a6a207c..02d11e25 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -61,3 +61,9 @@ rocketmq: accessKey: # if version > 4.4.0 secretKey: # if version > 4.4.0 +threadpool: + config: + coreSize: 10 + maxSize: 10 + keepAliveTime: 3000 + queueSize: 5000 \ No newline at end of file diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 3fa3fa5e..76823e89 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -19,7 +19,7 @@ - [%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n + [%d{yyyy-MM-dd HH:mm:ss.SSS}] %p %t - %m%n @@ -37,7 +37,7 @@ 10 - [%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n + [%d{yyyy-MM-dd HH:mm:ss.SSS}] %p %t - %m%n UTF-8 diff --git a/src/test/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfigTest.java b/src/test/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfigTest.java new file mode 100644 index 00000000..d6ab097d --- /dev/null +++ b/src/test/java/org/apache/rocketmq/dashboard/config/CollectExecutorConfigTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.config; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; + +public class CollectExecutorConfigTest { + + private final static int COUNT = 100; + + @Test + public void testCollectExecutor() throws Exception { + AtomicInteger num = new AtomicInteger(0); + CollectExecutorConfig config = new CollectExecutorConfig(); + config.setCoreSize(10); + config.setMaxSize(10); + config.setQueueSize(500); + config.setKeepAliveTime(3000); + ExecutorService collectExecutor = config.collectExecutor(config); + Assert.assertNotNull(collectExecutor); + CountDownLatch countDownLatch = new CountDownLatch(COUNT); + for (int i = 0; i < COUNT; i++) { + collectExecutor.submit(() -> { + num.getAndIncrement(); + countDownLatch.countDown(); + }); + } + countDownLatch.await(); + System.out.println(collectExecutor.isTerminated()); + Assert.assertEquals(COUNT, num.get()); + } +} diff --git a/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java b/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java index b8dfc258..c8206556 100644 --- a/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java +++ b/src/test/java/org/apache/rocketmq/dashboard/task/DashboardCollectTaskTest.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -38,6 +40,7 @@ import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.dashboard.BaseTest; +import org.apache.rocketmq.dashboard.config.CollectExecutorConfig; import org.apache.rocketmq.dashboard.config.RMQConfigure; import org.apache.rocketmq.dashboard.service.impl.DashboardCollectServiceImpl; import org.apache.rocketmq.dashboard.util.JsonUtil; @@ -68,6 +71,9 @@ public class DashboardCollectTaskTest extends BaseTest { @Mock private RMQConfigure rmqConfigure; + @Mock + private ExecutorService collectExecutor; + private int taskExecuteNum = 10; private File brokerFile; @@ -96,6 +102,7 @@ public void testCollectTopic() throws Exception { { TopicList topicList = new TopicList(); Set topicSet = new HashSet<>(); + topicSet.add("rmq_sys_xxx"); topicSet.add("topic_test"); topicSet.add("%RETRY%group_test"); topicSet.add("%DLQ%group_test"); @@ -121,19 +128,35 @@ public void testCollectTopic() throws Exception { } catch (Exception e) { Assert.assertEquals(e.getMessage(), "fetchAllTopicList exception"); } + dashboardCollectTask.collectTopic(); + + // multiple topic collection + CollectExecutorConfig config = new CollectExecutorConfig(); + config.setCoreSize(10); + config.setMaxSize(10); + config.setQueueSize(500); + config.setKeepAliveTime(3000); + ExecutorService collectExecutor = config.collectExecutor(config); for (int i = 0; i < taskExecuteNum; i++) { - dashboardCollectTask.collectTopic(); + CollectTaskRunnble collectTask = new CollectTaskRunnble("topic_test" + i, mqAdminExt, dashboardCollectService); + collectExecutor.submit(collectTask); + } + collectExecutor.shutdown(); + boolean loop = true; + do { + // Wait for all collectTasks to complete + loop = !collectExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); } + while (loop); LoadingCache> map = dashboardCollectService.getTopicMap(); - Assert.assertEquals(map.size(), 1); - Assert.assertEquals(map.get("topic_test").size(), taskExecuteNum); + Assert.assertEquals(map.size(), taskExecuteNum); dashboardCollectTask.saveData(); Assert.assertEquals(topicFile.exists(), true); Map> topicData = JsonUtil.string2Obj(MixAll.file2String(topicFile), new TypeReference>>() { }); - Assert.assertEquals(topicData.get("topic_test").size(), taskExecuteNum); + Assert.assertEquals(topicData.size(), taskExecuteNum); } @Test @@ -187,8 +210,8 @@ public void after() { private void mockBrokerFileExistBeforeSaveData() throws Exception { Map> map = new HashMap<>(); - map.put("broker-a" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); - map.put("broker-b" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); + map.put("broker-a" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); + map.put("broker-b" + ":" + MixAll.MASTER_ID, Lists.asList("1000", new String[] {"1000"})); MixAll.string2File(JsonUtil.obj2String(map), brokerFile.getAbsolutePath()); } }