Skip to content

Commit

Permalink
1.延迟队列实现 优化
Browse files Browse the repository at this point in the history
2.完成定时任务 实现逻辑
  • Loading branch information
ZhongFuCheng3y committed Feb 14, 2022
1 parent d3e6265 commit e27e420
Show file tree
Hide file tree
Showing 21 changed files with 257 additions and 248 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ austin项目**核心流程**:`austin-api`接收到发送消息请求,直接

**1**、austin使用的MySQL版本**5.7x**。如果目前使用的MySQL版本8.0,注意改变`pom.xml`所依赖的版本

**2**、适配`application.properties`的配置信息(`srping.datasource`)
**2**、适配`application.properties`的配置信息(`spring.datasource`)

**3**、执行`sql`文件夹下的`austin.sql`创建对应的表

Expand Down Expand Up @@ -97,9 +97,9 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co
- [ ] 04、持续提高消息推送系统的影响力,让更多的业务方了解其功能,进而挖掘更多拉新和唤醒用户的玩法,提高站内的次留率和转化率


**近期更新时间**2022年1月25日
**近期更新时间**2022年2月14日

**近期更新功能**austin前端管理系统
**近期更新功能**接入xxl-job分布式定时任务框架并完成定时任务逻辑

## 项目交流

Expand Down
10 changes: 10 additions & 0 deletions austin-cron/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
<artifactId>austin-support</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.java3y.austin</groupId>
<artifactId>austin-service-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.java3y.austin</groupId>
<artifactId>austin-service-api-impl</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.java3y.austin.cron.constants;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @author 3y
* @date 2022/2/13
* 缓冲pending 常量
* 延迟缓冲 pending 常量信息
*/
public class PendingConstant {

Expand All @@ -23,9 +26,10 @@ public class PendingConstant {
public static final Long TIME_THRESHOLD = 1000L;

/**
* 消费线程数
* 真正消费线程池配置的信息
*/
public static final Integer THREAD_NUM = 2;

public static final Integer CORE_POOL_SIZE = 2;
public static final Integer MAX_POOL_SIZE = 2;
public static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue<>(5);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public void execute() {
log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam());
Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam());
taskHandler.handle(messageTemplateId);

}

}
Original file line number Diff line number Diff line change
@@ -1,39 +1,88 @@
package com.java3y.austin.cron.pending;

import com.java3y.austin.cron.domain.CrowdInfoVo;
import com.java3y.austin.support.pending.BatchPendingThread;
import com.java3y.austin.support.pending.Pending;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.java3y.austin.cron.constants.PendingConstant;
import com.java3y.austin.cron.vo.CrowdInfoVo;
import com.java3y.austin.service.api.domain.BatchSendRequest;
import com.java3y.austin.service.api.domain.MessageParam;
import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.service.SendService;
import com.java3y.austin.support.pending.AbstractLazyPending;
import com.java3y.austin.support.pending.PendingParam;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 批量处理任务信息
* 延迟批量处理人群信息
* 调用 batch 发送接口 进行消息推送
*
* @author 3y
*/
@Component
@Slf4j
public class CrowdBatchTaskPending extends Pending<CrowdInfoVo> {
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class CrowdBatchTaskPending extends AbstractLazyPending<CrowdInfoVo> {

@Override
public void initAndStart(PendingParam pendingParam) {
threadNum = pendingParam.getThreadNum() == null ? threadNum : pendingParam.getThreadNum();
queue = pendingParam.getQueue();

for (int i = 0; i < threadNum; ++i) {
BatchPendingThread<CrowdInfoVo> batchPendingThread = new BatchPendingThread();
batchPendingThread.setPendingParam(pendingParam);
batchPendingThread.setName("batchPendingThread-" + i);
batchPendingThread.start();
}
@Autowired
private SendService sendService;

public CrowdBatchTaskPending() {
PendingParam<CrowdInfoVo> pendingParam = new PendingParam<>();
pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD)
.setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE))
.setTimeThreshold(PendingConstant.TIME_THRESHOLD)
.setExecutorService(ExecutorBuilder.create()
.setCorePoolSize(PendingConstant.CORE_POOL_SIZE)
.setMaxPoolSize(PendingConstant.MAX_POOL_SIZE)
.setWorkQueue(PendingConstant.BLOCKING_QUEUE)
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.build());
this.pendingParam = pendingParam;
}

@Override
public void doHandle(List<CrowdInfoVo> list) {
log.info("theadName:{},doHandle:{}", Thread.currentThread().getName(), list.size());
public void doHandle(List<CrowdInfoVo> crowdInfoVos) {

// 1. 如果参数相同,组装成同一个MessageParam发送
Map<Map<String, String>, String> paramMap = MapUtil.newHashMap();
for (CrowdInfoVo crowdInfoVo : crowdInfoVos) {
String receiver = crowdInfoVo.getReceiver();
Map<String, String> vars = crowdInfoVo.getParams();
if (paramMap.get(vars) == null) {
paramMap.put(vars, receiver);
} else {
String newReceiver = StringUtils.join(new String[]{
paramMap.get(vars), receiver}, StrUtil.COMMA);
paramMap.put(vars, newReceiver);
}
}

// 2. 组装参数
List<MessageParam> messageParams = Lists.newArrayList();
for (Map.Entry<Map<String, String>, String> entry : paramMap.entrySet()) {
MessageParam messageParam = MessageParam.builder().receiver(entry.getValue())
.variables(entry.getKey()).build();
messageParams.add(messageParam);
}

// 3. 调用批量发送接口发送消息
BatchSendRequest batchSendRequest = BatchSendRequest.builder().code(BusinessCode.COMMON_SEND.getCode())
.messageParamList(messageParams)
.messageTemplateId(CollUtil.getFirst(crowdInfoVos.iterator()).getMessageTemplateId())
.build();
sendService.batchSend(batchSendRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.cron.constants.PendingConstant;
import com.java3y.austin.cron.domain.CrowdInfoVo;
import com.java3y.austin.cron.pending.CrowdBatchTaskPending;
import com.java3y.austin.cron.service.TaskHandler;
import com.java3y.austin.cron.utils.ReadFileUtils;
import com.java3y.austin.cron.vo.CrowdInfoVo;
import com.java3y.austin.support.dao.MessageTemplateDao;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.pending.PendingParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @author 3y
Expand All @@ -27,39 +25,31 @@
public class TaskHandlerImpl implements TaskHandler {
@Autowired
private MessageTemplateDao messageTemplateDao;
@Autowired
private CrowdBatchTaskPending crowdBatchTaskPending;

@Autowired
private ApplicationContext context;

@Override
@Async
public void handle(Long messageTemplateId) {
log.info("start:{}", Thread.currentThread().getName());
log.info("TaskHandler handle:{}", Thread.currentThread().getName());

MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get();
if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) {
log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId);
return;
}

// 初始化pending的信息
PendingParam<CrowdInfoVo> pendingParam = new PendingParam<>();
pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD)
.setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE))
.setTimeThreshold(PendingConstant.TIME_THRESHOLD)
.setThreadNum(PendingConstant.THREAD_NUM)
.setPending(crowdBatchTaskPending);
crowdBatchTaskPending.initAndStart(pendingParam);

// 读取文件得到每一行记录给到队列做batch处理
CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class);
// 读取文件得到每一行记录给到队列做lazy batch处理
ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> {
if (CollUtil.isEmpty(row.getFieldMap())
|| StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) {
return;
}
HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap());
CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))
.params(params).build();
.params(params).messageTemplateId(messageTemplateId).build();
crowdBatchTaskPending.pending(crowdInfoVo);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.csv.*;
import com.google.common.base.Throwables;
import com.java3y.austin.cron.domain.CrowdInfoVo;
import com.java3y.austin.cron.vo.CrowdInfoVo;
import lombok.extern.slf4j.Slf4j;

import java.io.FileReader;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.java3y.austin.cron.domain;
package com.java3y.austin.cron.vo;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -21,6 +21,11 @@
@Builder
public class CrowdInfoVo implements Serializable {

/**
* 消息模板Id
*/
private Long messageTemplateId;

/**
* 接收者id
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

/**
* Task 执行器
Expand All @@ -22,6 +25,8 @@
@Data
@Accessors(chain = true)
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Task implements Runnable {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;
Expand All @@ -25,6 +28,8 @@
* 消费MQ的消息
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Receiver {
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public void process(ProcessContext context) {
* @param taskInfo
*/
private void filterIllegalPhoneNum(List<TaskInfo> taskInfo) {
Integer idType = taskInfo.get(0).getIdType();
Integer sendChannel = taskInfo.get(0).getSendChannel();
Integer idType = CollUtil.getFirst(taskInfo.iterator()).getIdType();
Integer sendChannel = CollUtil.getFirst(taskInfo.iterator()).getSendChannel();

if (IdType.PHONE.getCode().equals(idType) && ChannelType.SMS.getCode().equals(sendChannel)) {
Iterator<TaskInfo> iterator = taskInfo.iterator();
Expand All @@ -66,7 +66,7 @@ private void filterIllegalPhoneNum(List<TaskInfo> taskInfo) {

if (CollUtil.isNotEmpty(illegalPhone)) {
task.getReceiver().removeAll(illegalPhone);
log.error("{} find illegal phone!{}", task.getMessageTemplateId(), JSON.toJSONString(illegalPhone));
log.error("messageTemplateId:{} find illegal phone!{}", task.getMessageTemplateId(), JSON.toJSONString(illegalPhone));
}
if (CollUtil.isEmpty(task.getReceiver())) {
iterator.remove();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.java3y.austin.service.api.impl.action;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -35,7 +36,8 @@ public void process(ProcessContext context) {
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
, JSON.toJSONString(sendTaskModel.getTaskInfo().get(0)));
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));

}
}
}
Loading

0 comments on commit e27e420

Please sign in to comment.