Skip to content

Commit

Permalink
1. 每行读取csv文件(不再一次性读取)
Browse files Browse the repository at this point in the history
2. 读取后的内容做batch(为后续调用批量接口服务)--实现生产者与消费者
  • Loading branch information
ZhongFuCheng3y committed Feb 13, 2022
1 parent 7b7bd84 commit d3e6265
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.java3y.austin.cron.config;

import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 处理定时任务的线程池配置信息,为@Async注解服务
*
* @author 3y
*/
@Slf4j
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

@Bean("austinExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("austinAsyncExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.initialize();
return executor;
}

@Override
public Executor getAsyncExecutor() {
return executor();
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error("austinExecutor execute fail!method:{},params:{},ex:{}", method, params, Throwables.getStackTraceAsString(ex));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.java3y.austin.cron.constants;

/**
* @author 3y
* @date 2022/2/13
* 缓冲pending 常量
*/
public class PendingConstant {

/**
* 阻塞队列大小
*/
public static final Integer QUEUE_SIZE = 100;

/**
* 触发执行的数量阈值
*/
public static final Integer NUM_THRESHOLD = 100;

/**
* batch 触发执行的时间阈值,单位毫秒【必填】
*/
public static final Long TIME_THRESHOLD = 1000L;

/**
* 消费线程数
*/
public static final Integer THREAD_NUM = 2;


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class CrowdInfoVo implements Serializable {
/**
* 接收者id
*/
private String id;
private String receiver;

/**
* 参数信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ public class CronTaskHandler {
*/
@XxlJob("austinJob")
public void execute() {
log.info("XXL-JOB, Hello World.");
log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam());
Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam());

taskHandler.handle(messageTemplateId);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.java3y.austin.cron.pending;

import com.java3y.austin.cron.domain.CrowdInfoVo;
import com.java3y.austin.support.pending.BatchPendingThread;
import com.java3y.austin.support.pending.Pending;
import com.java3y.austin.support.pending.PendingParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* 批量处理任务信息
*
* @author 3y
*/
@Component
@Slf4j
public class CrowdBatchTaskPending extends Pending<CrowdInfoVo> {

@Override
public void initAndStart(PendingParam pendingParam) {
threadNum = pendingParam.getThreadNum() == null ? threadNum : pendingParam.getThreadNum();
queue = pendingParam.getQueue();

for (int i = 0; i < threadNum; ++i) {
BatchPendingThread<CrowdInfoVo> batchPendingThread = new BatchPendingThread();
batchPendingThread.setPendingParam(pendingParam);
batchPendingThread.setName("batchPendingThread-" + i);
batchPendingThread.start();
}
}

@Override
public void doHandle(List<CrowdInfoVo> list) {
log.info("theadName:{},doHandle:{}", Thread.currentThread().getName(), list.size());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.cron.constants.PendingConstant;
import com.java3y.austin.cron.domain.CrowdInfoVo;
import com.java3y.austin.cron.pending.CrowdBatchTaskPending;
import com.java3y.austin.cron.service.TaskHandler;
import com.java3y.austin.cron.utils.ReadFileUtils;
import com.java3y.austin.support.dao.MessageTemplateDao;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.pending.PendingParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @author 3y
Expand All @@ -24,23 +27,42 @@
public class TaskHandlerImpl implements TaskHandler {
@Autowired
private MessageTemplateDao messageTemplateDao;
@Autowired
private CrowdBatchTaskPending crowdBatchTaskPending;


@Override
@Async
public void handle(Long messageTemplateId) {
log.info("start:{}", Thread.currentThread().getName());

MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get();
if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) {
log.error("TaskHandler#handle crowdPath empty!");
log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId);
return;
}
List<CrowdInfoVo> csvRowList = ReadFileUtils.getCsvRowList(messageTemplate.getCronCrowdPath());

if (CollUtil.isNotEmpty(csvRowList)) {
// 初始化pending的信息
PendingParam<CrowdInfoVo> pendingParam = new PendingParam<>();
pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD)
.setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE))
.setTimeThreshold(PendingConstant.TIME_THRESHOLD)
.setThreadNum(PendingConstant.THREAD_NUM)
.setPending(crowdBatchTaskPending);
crowdBatchTaskPending.initAndStart(pendingParam);

}
// 读取文件得到每一行记录给到队列做batch处理
ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> {
if (CollUtil.isEmpty(row.getFieldMap())
|| StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) {
return;
}
HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap());
CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))
.params(params).build();
crowdBatchTaskPending.pending(crowdInfoVo);
});

log.info("csv info:", JSON.toJSONString(csvRowList));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.csv.CsvData;
import cn.hutool.core.text.csv.CsvRow;
import cn.hutool.core.text.csv.CsvUtil;
import cn.hutool.core.text.csv.*;
import com.google.common.base.Throwables;
import com.java3y.austin.cron.domain.CrowdInfoVo;
import lombok.extern.slf4j.Slf4j;

import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -23,7 +23,47 @@
public class ReadFileUtils {

/**
* 读取csv文件
* csv文件 存储 接收者 的列名
*/
public static final String RECEIVER_KEY = "userId";

/**
* 读取csv文件,每读取一行都会调用 csvRowHandler 对应的方法
*
* @param path
* @param csvRowHandler
*/
public static void getCsvRow(String path, CsvRowHandler csvRowHandler) {
try {
// 把首行当做是标题,获取reader
CsvReader reader = CsvUtil.getReader(new FileReader(path),
new CsvReadConfig().setContainsHeader(true));
reader.read(csvRowHandler);
} catch (Exception e) {
log.error("ReadFileUtils#getCsvRow fail!{}", Throwables.getStackTraceAsString(e));

}
}

/**
* 从文件的每一行数据获取到params信息
* [{key:value},{key:value}]
* @param fieldMap
* @return
*/
public static HashMap<String, String> getParamFromLine(Map<String, String> fieldMap) {
HashMap<String, String> params = MapUtil.newHashMap();
for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
if (!ReadFileUtils.RECEIVER_KEY.equals(entry.getKey())) {
params.put(entry.getKey(), entry.getValue());
}
}
return params;
}


/**
* 一次性读取csv文件整个内容
* 1. 获取第一行信息(id,paramsKey1,params2Key2),第一列默认为接收者Id
* 2. 把文件信息塞进对象内
* 3. 把对象返回
Expand All @@ -46,12 +86,14 @@ public static List<CrowdInfoVo> getCsvRowList(String path) {
for (int j = 1; j < headerInfo.size(); j++) {
param.put(headerInfo.get(j), row.get(j));
}
result.add(CrowdInfoVo.builder().id(row.get(0)).params(param).build());
result.add(CrowdInfoVo.builder().receiver(row.get(0)).params(param).build());
}

} catch (Exception e) {
log.error("TaskHandler#getCsvRowList fail!{}", Throwables.getStackTraceAsString(e));
log.error("ReadFileUtils#getCsvRowList fail!{}", Throwables.getStackTraceAsString(e));
}
return result;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void run() {
return;
}

// 1.平台通用去重 test
// 1.平台通用去重
deduplicationRuleService.duplication(taskInfo);

// 2. 真正发送消息
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.java3y.austin.support.pending;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 延迟消费的线程 实现
* 积攒一定的数量 或 时间 才消费,达到批量消费的效果
*
* @author 3y
*/
@Data
@Accessors(chain = true)
@Slf4j
public class BatchPendingThread<T> extends Thread {

private PendingParam<T> pendingParam;

/**
* 批量装载任务
*/
private List<T> tasks = new ArrayList<>();

/**
* 当前装载任务的大小
*/
private Integer total = 0;

/**
* 上次执行的时间
*/
private Long lastHandleTime = System.currentTimeMillis();


@Override
public void run() {
while (true) {
try {
T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS);
if (null != obj) {
tasks.add(obj);
}

// 处理条件:1. 数量超限 2. 时间超限
if ((tasks.size() >= pendingParam.getNumThreshold())
|| (System.currentTimeMillis() - lastHandleTime >= pendingParam.getTimeThreshold())) {
List<T> taskRef = tasks;
tasks = Lists.newArrayList();
lastHandleTime = System.currentTimeMillis();
pendingParam.getPending().handle(taskRef);
}
} catch (Exception e) {
log.error("BatchPendingThread#run failed:{}", Throwables.getStackTraceAsString(e));
}
}
}
}
Loading

0 comments on commit d3e6265

Please sign in to comment.