当前位置: 首页 > ds >正文

【redis实战篇】第六天

摘要:

        本文介绍了基于Redis的秒杀系统优化方案,主要包含两部分:1)通过Lua脚本校验用户秒杀资格,结合Java异步处理订单提升性能;2)使用Redis Stream实现消息队列处理订单。方案采用Lua脚本保证库存校验和一人一单的原子性,通过阻塞队列异步保存订单,并引入Redisson分布式锁防止重复下单。

        Redis Stream实现消息队列,支持消费组和ACK确认机制,确保订单可靠处理。系统还设计了pending-list异常处理机制,保证订单处理的最终一致性。这种架构显著提升了秒杀系统的高并发性能和数据一致性。

一,秒杀优化

redis校验用户秒杀资格(库存是否充足且保证一人一单),通过阻塞队列异步处理保存订单到数据库的操作,提升秒杀性能。

1,编写校验秒杀资格lua脚本

库存不足返回1,用户重复下单返回2,资格校验通过返回0。

local voucherId = ARGV[1]
local userId = ARGV[2]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
---库存不足
if tonumber(redis.call('get', 'stockKey'))<=0 thenreturn 1
end
---库存充足,判断用户是否下单
if redis.call('sismember', orderKey, userId) ==1 then---重复下单return 2
end
---扣减库存保存用户id到set中
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
return 0

2,读取lua脚本到Java程序

(1)lua文件读取配置

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();//采用spring的读取文件资源的ClassPathResourceSECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}

(2)调用stringRedisTemlate的execute方法读取SECKILL_SCRIPT脚本,并传入参数ARGV[..],因为脚本中不需要KEYS[..]变量,所以这里传入空集合。

Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString()
);

3,调用阻塞队列BlockingQueue<VoucherOrder>的数组实现并指定大小,将创建好的订单加入到阻塞队列,方法即可结束,大大提升性能。

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
VoucherOrder voucherOrder = new VoucherOrder();
//订单id,用户id,优惠卷id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);

4,开启独立线程处理将订单写入数据库,加上@PostConstruct注解保证在类初始化之后立即执行

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(() -> {while (true) {try {//获取队列的头部,如果需要则等待直到元素可用为止VoucherOrder order = orderTasks.take();handleVoucherOrder(order);} catch (Exception e) {log.error("处理订单异常", e);}}});}

5,order传入上锁的方法handleVoucherOrder(),防止同一个用户的多个请求并发产生的问题,保证每个请求(线程)单独执行

    private void handleVoucherOrder(VoucherOrder order) {RLock lock = redissonClient.getLock(LOCK_ORDER_KEY + order.getUserId());try {boolean isLock = lock.tryLock();if (!isLock) {log.error("操作频繁,请稍后重试!");return;}proxy.createVoucherOrder(order);} finally {lock.unlock();}}

6,编写订单写入数据库的方法createVoucherOrder()并开启事务,这样保证锁的范围比事务范围大,避免出现事务未提交锁提前释放的问题。

    @Transactionalpublic void createVoucherOrder(VoucherOrder order) {//一人一单判断Long userId = order.getUserId();Long voucherId = order.getVoucherId();long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {log.error("不能重复下单!");return;}//扣减库存//这个sql语句是原子性的操作,而LambdaUpdateWrapper表达式不是boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {log.error("库存不足!");return;}save(order);}

 

二,redis实现消息队列

(1)基于list模拟消息队列

(2)基于pubsub的消息队列

(3)基于stream类型的消息队列

基于·redis的stream结构作为消息队列,实现异步秒杀

1,创建STREAM数据stream.order作为阻塞队列和消费组g1

xgroup create stream.order g1 0 mkstream

2,lua脚本增加发送消息到队列中的操作和订单id

local orderId = ARGV[3]
redis.call('xadd', 'stream.order', '*', 'userId', userId,'voucherId', voucherId, 'id', orderId)

3,java客户端获取消息队列中的消息

(1)获取消息队列的订单信息,redis命令:XREADGROUP GROUP g1 c1 count 1 block 2000 streams stream.order >(消费者c1,读取数量1,阻塞时间2000毫秒,>表示从当前消费者组中未消费的最新的消息开始读取)

(2)如果返回的结果为空或者list是空集合,说明没有获取到,continue后面操作重新获取;获取成功,取出消息MapRecord,取出订单信息键值对record.getValue(),最后填入VoucherOrder即可

(3)ack确认消息xack stream.order g1 id

while (true) {try {//获取消息队列的订单信息XREADGROUP GROUP g1 c1 count 1 block 2000 streams stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//获取失败,进行下一次获取循环if (list == null || list.isEmpty()) {continue;}//获取成功,处理消息队列中的订单信息MapRecord<String, Object, Object> record = list.getFirst();Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ack确认消息xack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);//如果中间出现异常那么就在pending-list中保存订单信息handlePendingList();}
}

4,如果中间出现异常那么就在pending-list中保存订单信息

(1)获取pending-list中的订单信息,redis命令:XREADGROUP GROUP g1 c1 count 1 streams stream.order 0(0表示从开始位置读取消息)

(2)获取失败,说明pending-list里面没有异常消息,那么直接结束循环;如果再次出现异常,记录日志,休眠线程一段时间 ,重新开始循环。                                   

    private void handlePendingList() {while (true) {try {List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if (list == null || list.isEmpty()) {break;}MapRecord<String, Object, Object> record = list.getFirst();//获取订单键值对Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list异常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}

http://www.xdnf.cn/news/9840.html

相关文章:

  • 一根网线连接两台电脑组建局域网
  • 不起火,不爆炸,高速摄像机、数字图像相关DIC技术在动力电池新国标安全性能测试中的应用
  • 代码随想录算法训练营第60期第五十一天打卡
  • R3GAN训练自己的数据集
  • Java中float和double的区别与用法解析
  • 华为OD机试真题——阿里巴巴找黄金宝箱(III)(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • WPF 全局加载界面、多界面实现渐变过渡效果
  • DexWild:野外机器人策略的灵巧人机交互
  • 华为OD机试真题——简单的自动曝光平均像素(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • 如何更好的理解云计算和云原生?
  • JDBC连接数据库精准提炼
  • MongoDB(七) - MongoDB副本集安装与配置
  • Python 中的 if-elif-else 语句与控制流详解:从基础到高级应用
  • 电感专题归纳
  • Unity-QFramework框架学习-MVC、Command、Event、Utility、System、BindableProperty
  • 深入理解 SELinux:通过 Nginx 和 SSH 服务配置实践安全上下文与端口策略
  • 家庭路由器改装,搭建openwrt旁路由以及手机存储服务器,实现外网节点转发、内网穿透、远程存储、接入满血DeepSeek方案
  • LVS+keepalived高可用群集
  • mac笔记本如何快捷键截图后自动复制到粘贴板
  • 首发!PPIO派欧云上线DeepSeek-R1-0528-Qwen3-8B蒸馏模型
  • 【数据结构】图论核心算法解析:深度优先搜索(DFS)的纵深遍历与生成树实战指南​
  • Spring Boot 3.5.0中文文档上线
  • 在 WSL Ubuntu-24.04 上安装 Nacos 2.5.1 并使用 MySQL 数据库
  • 【Linux】网络--传输层--深入理解TCP协议
  • 计算机组成与体系结构:固态硬盘(Solid State Drives)
  • 数据驱动健康未来——大数据如何革新公共卫生监测?
  • [250528] NixOS 25.05 “Warbler“ 正式发布:GNOME 48、Kernel 6.12 及海量软件包更新!
  • vue3+element-plus项目主题色切换;element-plus换肤
  • DAX权威指南5:筛选上下文、表操作函数与层级结构
  • SSE vs WebSocket:两种通讯方案该如何选择?