Skip to content

Commit

Permalink
RabbitMQ,路由(topic)状态下,实现异步秒杀
Browse files Browse the repository at this point in the history
  • Loading branch information
haopengmai committed Nov 17, 2023
1 parent d32469c commit 388edbb
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 70 deletions.
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@
<version>1.4</version>
</dependency>
<!--这部分是邮箱相关的依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

</dependencies>

Expand Down
53 changes: 53 additions & 0 deletions src/main/java/com/hmdp/config/RabbitMQTopicConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.hmdp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQTopicConfig {
public static final String QUEUE = "seckillQueue";
public static final String EXCHANGE = "seckillExchange";
public static final String ROUTINGKEY = "seckill.#";
@Bean
public Queue queue(){
return new Queue(QUEUE);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTINGKEY);
}
// private static final String QUEUE01="queue_topic01";
// private static final String QUEUE02="queue_topic02";
// private static final String EXCHANGE = "topicExchange";
// private static final String ROUTINGKEY01 = "#.queue.#";
// private static final String ROUTINGKEY02 = "*.queue.#";
// @Bean
// public Queue topicqueue01(){
// return new Queue(QUEUE01);
// }
// @Bean
// public Queue topicqueue02(){
// return new Queue(QUEUE02);
// }
// @Bean
// public TopicExchange topicExchange(){
// return new TopicExchange(EXCHANGE);
// }
// @Bean
// public Binding topicbinding01(){
// return BindingBuilder.bind(topicqueue01()).to(topicExchange()).with(ROUTINGKEY01);
// }
// @Bean
// public Binding topicbinding02(){
// return BindingBuilder.bind(topicqueue02()).to(topicExchange()).with(ROUTINGKEY02);
// }
}
64 changes: 64 additions & 0 deletions src/main/java/com/hmdp/rebbitmq/MQReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.hmdp.rebbitmq;

import com.alibaba.fastjson.JSON;
import com.hmdp.config.RabbitMQTopicConfig;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
* 消息消费者
*/
@Slf4j
@Service
public class MQReceiver {

@Resource
IVoucherOrderService voucherOrderService;

@Resource
ISeckillVoucherService seckillVoucherService;
/**
* 接收秒杀信息并下单
* @param msg
*/
@Transactional
@RabbitListener(queues = RabbitMQTopicConfig.QUEUE)
public void receiveSeckillMessage(String msg){
log.info("接收到消息: "+msg);
VoucherOrder voucherOrder = JSON.parseObject(msg, VoucherOrder.class);

Long voucherId = voucherOrder.getVoucherId();
//5.一人一单
Long userId = voucherOrder.getUserId();
//5.1查询订单
int count = voucherOrderService.query().eq("user_id",userId).eq("voucher_id", voucherId).count();
//5.2判断是否存在
if(count>0){
//用户已经购买过了
log.error("该用户已购买过");
return ;
}
log.info("扣减库存");
//6.扣减库存
boolean success = seckillVoucherService
.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId)
.gt("stock",0)//cas乐观锁
.update();
if(!success){
log.error("库存不足");
return;
}
//直接保存订单
voucherOrderService.save(voucherOrder);
}

}
28 changes: 28 additions & 0 deletions src/main/java/com/hmdp/rebbitmq/MQSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.hmdp.rebbitmq;

import com.hmdp.config.RabbitMQTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* 消息发送者
*/
@Slf4j
@Service
public class MQSender {

@Autowired
private RabbitTemplate rabbitTemplate;

private static final String ROUTINGKEY = "seckill.message";
/**
* 发送秒杀信息
* @param msg
*/
public void sendSeckillMessage(String msg){
log.info("发送消息"+msg);
rabbitTemplate.convertAndSend(RabbitMQTopicConfig.EXCHANGE,ROUTINGKEY,msg);
}
}
153 changes: 83 additions & 70 deletions src/main/java/com/hmdp/service/impl/VoucherOrderServiceImpl.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.hmdp.service.impl;

import com.alibaba.fastjson.JSON;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.rebbitmq.MQSender;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
Expand All @@ -13,11 +15,15 @@
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collections;

/**
* <p>
Expand All @@ -29,51 +35,58 @@
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
//该类实现了mybatisplus,用它来在数据库中查询数据
@Resource
private VoucherOrderServiceImpl voucherOrderService;

@Autowired
private RedisIdWorker redisIdWorker;
@Resource
private MQSender mqSender;

@Resource
private RedissonClient redissonClient;
private StringRedisTemplate stringRedisTemplate;

//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

@Override
public Result seckillVoucher(Long voucherId) {
//1. 从数据库中查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//1.执行lua脚本
Long userId = UserHolder.getUser().getId();

//2. 判断秒杀时间是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now()))
{
return Result.fail("秒杀还未开始,请耐心等待");
Long r = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
//2.判断结果为0
int result = r.intValue();
if (result != 0) {
//2.1不为0代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "该用户重复下单");
}
//2.2为0代表有购买资格,将下单信息保存到阻塞队列

//3. 判断秒杀时间是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
//2.3创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//2.4订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//2.5用户id
voucherOrder.setUserId(userId);
//2.6代金卷id
voucherOrder.setVoucherId(voucherId);

//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
//2.7将信息放入MQ中
mqSender.sendSeckillMessage(JSON.toJSONString(voucherOrder));

Long userId = UserHolder.getUser().getId();
RLock redisLock = redissonClient.getLock("order:" + userId);
boolean isLock = redisLock.tryLock();//不传入参数,就是失败不等待

if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
return voucherOrderService.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
//2.7 返回订单id
return Result.ok(orderId);
// 单机模式下,使用synchronized实现锁
// synchronized (userId.toString().intern())
// {
Expand All @@ -84,42 +97,42 @@ public Result seckillVoucher(Long voucherId) {
}


@Transactional
public Result createVoucherOrder(Long voucherId) {
// 一人一单逻辑
Long userId = UserHolder.getUser().getId();


int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
if (count > 0){
return Result.fail("你已经抢过优惠券了哦");
}

//5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0) //加了CAS 乐观锁,Compare and swap
.update();

if (!success) {
return Result.fail("库存不足");
}

// 库存足且在时间范围内的,则创建新的订单
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 设置订单id,生成订单的全局id
long orderId = redisIdWorker.nextId("order");
//6.2 设置用户id
Long id = UserHolder.getUser().getId();
//6.3 设置代金券id
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(id);
//7. 将订单数据保存到表中
save(voucherOrder);
//8. 返回订单id
return Result.ok(orderId);
}
// @Transactional
// public Result createVoucherOrder(Long voucherId) {
// // 一人一单逻辑
// Long userId = UserHolder.getUser().getId();
//
//
// int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
// if (count > 0){
// return Result.fail("你已经抢过优惠券了哦");
// }
//
// //5. 扣减库存
// boolean success = seckillVoucherService.update()
// .setSql("stock = stock - 1")
// .eq("voucher_id", voucherId)
// .gt("stock",0) //加了CAS 乐观锁,Compare and swap
// .update();
//
// if (!success) {
// return Result.fail("库存不足");
// }
//
//// 库存足且在时间范围内的,则创建新的订单
// //6. 创建订单
// VoucherOrder voucherOrder = new VoucherOrder();
// //6.1 设置订单id,生成订单的全局id
// long orderId = redisIdWorker.nextId("order");
// //6.2 设置用户id
// Long id = UserHolder.getUser().getId();
// //6.3 设置代金券id
// voucherOrder.setVoucherId(voucherId);
// voucherOrder.setId(orderId);
// voucherOrder.setUserId(id);
// //7. 将订单数据保存到表中
// save(voucherOrder);
// //8. 返回订单id
// return Result.ok(orderId);
// }
}
20 changes: 20 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ spring:
database: 6
jackson:
default-property-inclusion: non_null # JSON处理时忽略非空字段
rabbitmq:
host: 192.168.88.128
username: haopengmai
password: 123456
virtual-host: /
port: 15672
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
template:
retry:
enabled: true
initial-interval: 1000ms
max-attempts: 3
max-interval: 10000ms
multiplier: 1
mybatis-plus:
type-aliases-package: com.hmdp.entity # 别名扫描包
logging:
Expand Down
Loading

0 comments on commit 388edbb

Please sign in to comment.