Redis实战-优惠券秒杀解决方案总结大全
1.添加优惠券
1.1优惠券数据库设计
设计了两种优惠券,一种是普通优惠券,一种是秒杀优惠券。
1.1.1tb_voucher的设计
tb_voucher的设计是为了存储优惠券的基本信息,优惠金额,使用规则。
支付金额和抵扣金额使用分作为单位,这样就可以不使用浮点数了,直接进行使用整数即可。
type是决定类型的,可以存储普通券/秒杀券。
statuc可以进行存储状态,使用的也是tinyint。
你可以看这几个设定,大量使用了unsigned无符号设计,可以进行利用更多空间,存储空间多了整整一倍呢!
一些不需要考虑存储负值的数据,完全可以设计为unsigned无符号。
create table tb_voucher
(id bigint unsigned auto_increment comment '主键'primary key,shop_id bigint unsigned null comment '商铺id',title varchar(255) not null comment '代金券标题',sub_title varchar(255) null comment '副标题',rules varchar(1024) null comment '使用规则',pay_value bigint unsigned not null comment '支付金额,单位是分。例如200代表2元',actual_value bigint not null comment '抵扣金额,单位是分。例如200代表2元',type tinyint unsigned default '0' not null comment '0,普通券;1,秒杀券',status tinyint unsigned default '1' not null comment '1,上架; 2,下架; 3,过期',create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
)collate = utf8mb4_general_cirow_format = COMPACT;
1.1.2tb_seckill_voucher的设计
tb_seckill_voucher是为秒杀卷准备的,因为普通卷没有进行设计库存和有效期,只进行设定了是否可以进行使用,这里主要是进行关联的秒杀卷,秒杀卷优惠力度高,肯定是要由有效时间,库存量的,不能无限发放,主要配合活动进行使用。
create table tb_seckill_voucher
(voucher_id bigint unsigned not null comment '关联的优惠券的id'primary key,stock int not null comment '库存',create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',begin_time timestamp default CURRENT_TIMESTAMP not null comment '生效时间',end_time timestamp default CURRENT_TIMESTAMP not null comment '失效时间',update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
)comment '秒杀优惠券表,与优惠券是一对一关系' collate = utf8mb4_general_cirow_format = COMPACT;
1.2申请添加优惠券的接口
一次可以进行把两个表的数据都添加好,并且设置了事务回滚。
2.实现优惠券秒杀下单
2.1数据库设计
数据库设计的非常好,该使用unsigned的地方都进行使用了unsigned(比如ID部分,状态值,类型值),这些无需进行显示负值的数据都会进行使用unsigned的,可以进行帮助利用很多空间,可以存储的有效内容扩大了一倍!!!
时间方面,先说一下整个订单的生命周期:
创建秒杀订单 => 支付秒杀订单 => 使用秒杀订单。
创建秒杀订单 => 支付秒杀订单 => 退款。
字符集方面,设计数据库的时候也进行指定了字符集,collate = utf8mb4_general_ci,使用这个是进行设定为ut8mb4,general_cl是进行设置比较字符串的时候,进行忽略大小写。
使用row_format = COMPACT是进行让MySQL将数据存储的更加紧凑,以剑圣空间,比较适合行数据比较短但是有很多行的情况。
create table tb_voucher_order
(id bigint not null comment '主键'primary key,user_id bigint unsigned not null comment '下单的用户id',voucher_id bigint unsigned not null comment '购买的代金券id',pay_type tinyint unsigned default '1' not null comment '支付方式 1:余额支付;2:支付宝;3:微信',status tinyint unsigned default '1' not null comment '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',create_time timestamp default CURRENT_TIMESTAMP not null comment '下单时间',pay_time timestamp null comment '支付时间',use_time timestamp null comment '核销时间',refund_time timestamp null comment '退款时间',update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
)collate = utf8mb4_general_cirow_format = COMPACT;
2.2业务逻辑
先判断秒杀是否开始/结束,如果未开始/已结束就无法下单。
然后判断库存是否存储,不足则无法下单。
如果可以进行下单就进行创建订单,但是这只是秒杀功能的一部分而已,这里下单成功后并不会进行。
2.3业务实现
这是业务实现代码,我们就借助几个比较重要的点进行分析一下几个比较重要的点。
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {// 1. 查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2. 判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀未开始!");}// 3. 判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已经结束return Result.fail("秒杀已经结束!");}// 4. 判断库存是否充足if (voucher.getStock() <= 0) {// 库存不足return Result.fail("库存不足!");}// 5. 扣减库存boolean isSuccess = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!isSuccess) {// 扣减失败return Result.fail("库存不足!");}// 6. 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1 订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2 用户IDLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3 代金券IDvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7. 返回订单号return Result.ok(voucherOrder);
}
2.3.1时间的使用
这里实体类的时间接收使用的是LocalDateTime进行接收数据库的DateTime时间。
自从JDK8推出了LocalDateTime API之后,建议进行使用LocalDateTime的进行接收时间,并且MyBatis和MyBatisPlus也是支持这样的映射关系的。
完成秒杀的判断秒杀是否开始/结束,可以进行使用LocalDateTime提供的API,isAfrer和isBefore。
进行接收一个LocalDateTime对象进行判断,返回一个Boolean值。
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀未开始!");
}
// 3. 判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已经结束return Result.fail("秒杀已经结束!");
}
2.3.2扣减库存 => 有意思的MyBatisPlus调用链
业务很简单,使用setSql进行设置SET部分的SQL语句,然后后面使用eq进行where过滤为指定优惠券,最后调用uodate方法进行执行更新。
boolean isSuccess = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
MyBatisPlus的update调用链,MyBatisPlus的ServiceImpl接口提供了update()方法,这个方法也支持链式调用,可以使用setSql进行设置更新语句的Set部分,我们看一下这个语法。
这是SQL语句的定义。
Update table_name SET field = value, [field = value...] Where ...
setSql("sql语句"),可以直接设置SET部分的SQL语句,提供了更大的灵活性。
2.3.3事务的设定
由于在整个函数中进行使用了扣减秒杀卷的库存,也进行新增了秒杀订单,涉及到了两个表的改动,所以为了保证两表的一致性,需要对函数进行设置事务,直接在方法上进行使用@Transition注解即可开始SpringBoot的事务。
3.秒杀优化之解决超卖问题
3.1超卖问题是如何出现的?
为什么会出现超卖问题呢?因为多个线程同时进行并发来的时候可能会出现问题。
因为整个流程是先进行查询库存看看库存是否足够嘛,假如当还有一个优惠卷的时候,一个线程进行查询了库存,还没有进行扣减库存的时候,在这个中间时刻,冲进来一堆线程也查询了数据库,那么数据一定是还有库存,在更新的时候我们没有进行做校验操作什么的,很多可能出现多个线程同时更新成功的情况,就导致出现了超卖问题。
进行定义200个线程,一秒内打出去。
发现果然出现了超卖问题。
吞吐量其实还不错,可以达到195.3次/s,时间平均值在199ms左右,对于多次数据库查询也是很强了。
3.2超卖问题如何解决?
其实解决的方案就是加锁,加锁的方案可以采用悲观锁,也可以采用乐观锁。
但是像这种高并发请求,其实使用悲观锁不是一个好方案。会明显降低并发量,因为所有线程都是要进行等待的,所以秒杀这种情况使用乐观锁+抢券失败的方案次才是比较好的解决方案,可以在一定程度上提供较好的并发能力,并且能保障并发安全。
3.3乐观锁的解决方案
乐观锁的关键就是判断之前查询得到的数据是否被修改过。
3.3.1版本号乐观锁的解决方案
使用版本号进行判断之前的数据是否被修改过。
版本号的解决方案,就是给需要进行改动库存的优惠券进行设置一个版本号,在查询库存的时候,需要将库存和版本号都查询出来,修改的时候携带版本号并且将库存和版本号都进行修改,如果是以前的版本号本线程就可以线程安全地修改成功,不是就修改失败。
3.3.2CAS法
CAS法其实就是用需要查询并修改的字段替代了版本号,实现了一个隐式的版本号,在修改的时候加一个where过滤条件,如果和之前的stock库存一致就进行扣减,如果不一样就扣减失败。
我们可以进行测试一下。
// 5. 扣减库存
boolean isSuccess = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", voucher.getStock()).update();
还是一秒钟打出200个并发。
这次虽然没有超卖,但是就成功了20次,这点成功率太低了,90%的失败率,我们的目标是50%的失败率!!!
3.3.3CAS法升级版本
使用stock判断和之前是否一致的失败率太高了,竟然达到了百分之90%,为了提高成功率,并且综合业务进行考虑发现,完全可以不去要求stock完全一致,只要stock > 0就可以了,有库存就行,没必要库存完全一致。
// 5. 扣减库存
boolean isSuccess = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();
测试一下:
嘿嘿,这次成功率达到百分之五十啦,也没有超卖问题,哈哈哈。
3.4总结超卖问题解决方案
悲观锁 => 简单粗暴,性能垃圾
乐观锁 => 性能优良,有成功率低的可能性,还是看如何设计实现乐观锁
4.秒杀实现一人一单的功能
4.1业务逻辑
秒杀优惠券肯定不能让一个人抢走的,肯定要进行实现一人一单的功能,我们可以先判断库存充足吗,如果充足就去根据用户ID和优惠券ID判断订单是否存在,存在就滚蛋,不存在就下单成功。
4.2业务实现
其实实现判断一个人是否下单过的代码很简单,但是如果保证高并发业务,并且实现优雅的事务是比较难的,典型的跑起来很容易,优化很难。
@Override
public Result seckillVoucher(Long voucherId) {// 1. 查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2. 判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀未开始!");}// 3. 判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已经结束return Result.fail("秒杀已经结束!");}// 4.1 判断库存是否充足if (voucher.getStock() <= 0) {// 库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}
}@Transactional
public Result createVoucherOrder(Long voucherId) {// 5. 一人一单Long userId = UserHolder.getUser().getId();// 查询用户是否下单过int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 下单过滚蛋if (count > 0) {return Result.fail("已经购买过啦!!!");}// 6. 扣减库存boolean isSuccess = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!isSuccess) {// 扣减失败return Result.fail("库存不足!");}// 7. 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1 订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2 用户IDvoucherOrder.setUserId(userId);// 7.3 代金券IDvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 8. 返回订单号return Result.ok(voucherOrder);
}
4.2.1为什么要加锁?
仔细想想,如果一个人够快是不是和库存超卖一样会出现相同的问题,一个人重复下单多次。
因为在新增订单之前,有多个线程都查询出没有下过订单了,所以我们需要进行加锁防止这种情况的出现。像这种情况,其实也可以通过乐观锁进行实现,但是黑马选择了悲观锁,那我们就用悲观锁先跟进一下。
这里我先列出了乐观锁和悲观锁实现业务的流程原理,可以简单看一下。
锁的实现很有意思,我们可以接着看锁的锁头和锁的范围,很有意思的。
4.2.1.1锁的范围的设置
为什么锁的范围是包住了整个创建订单流程?
创建订单的流程包括了:判断是否下单过,扣减库存,创建订单的流程。
包含整个查询是否下过单和创建订单的流程,一定可以保证一个人只能下单一次。
这个方法会进行返回一个被Result包裹的voucherOrder对象,由主函数进行返回,如果我们只给这个方法加锁,或者是在方法内进行加锁,当方法退出的时候,还没有进行返回的时候,其实那个过程还没有进行提交事务。
4.2.1.2锁对象的设置
为了控制锁的粒度,增加并发程度,并且一人一单防止一个人多次下单的这个功能其实就是进行保障一个人别多次并发即可,所以锁粒度只需要进行控制到每个用户,这个并发量是不是就上来了?这里进行使用的就是ID作为一个锁,但是我们不能进行使用Long对象直接作为锁。
每个ID都是一个对象,学过JAVA基础肯定知道,Long类型只有在-128 - 127是同一个对象,其它数字都是会进行新建一个对象的,所以如果直接使用Long对象就达不到给一个userID进行加锁的目的了,为了破局,我们可以进行使用JDK8及以后的StringTable的特性,userId.toString()是进行新建一个String对象(存储Long值)存储在堆上,然后进行调用intern,Intern这个函数很有意思,如果StringTable(字符串常量池)中没有对应的数据,就会在StringTable进行新建一个引用指向堆中的String对象,如果StringTable有这个引用(指向对应字面量值相同的对象),就返回这个引用即可。
userId.toString().intern()
4.2.2事务的设置
由于进行涉及到了库存扣减和创建订单,所以要给创建订单的函数进行创建一个Transitional事务。
4.3测试效果
准备1秒钟使用一个账户打出200个并发。
发现redis中原子计数器的值为1。
订单就增加了一条。
就减了一次库存。
5.分布式部署问题解决
如果我们采用分布式集群部署,那我们使用synchronized做重量级锁就可能出现很多问题。
5.1并发安全问题
如果我们在多台服务器进行部署就可能出现并发安全问题,因为synchronized是基于JVM的,多台服务器就相当于启动了多个独立的JVM,如果用户的一个请求通过Nginx反向代理到多个服务器的时候,就会导致出现问题,还是会出现并发安全的问题。
5.2分布式锁解决问题
5.2.1单机JVM的问题
多机部署的时候,使用单个JVM进行解决问题的时候,一个监视器只能进行监视一个JVM,多机JVM就无法进行锁了,这样就会出现多个并发请求同时在多台服务器上进行执行操作MySQL的情况,会导致大问题的。
5.2.2分布式锁解决单机JVM锁的我问题
使用分布式锁就可以解决单机JVM锁监视器的问题,多个JVM都使用这个监视器来进行同步执行,就保证了不会出现多台服务器同时执行锁请求的问题,可以使用这个分布式锁让多个服务器的JVM可以进行实现互斥锁的功能。
JVM的锁是单进程可见的,分布式锁是多进程可见的。
5.3了解分布式锁
5.3.1什么是分布式锁
分布式锁:满足分布式系统或者集群模式下多进程可见并且互斥的锁。
分布式锁要满足的特性:多进程可见,互斥,高可用,高性能,安全性。
5.3.2分布式锁的实现
分布式锁可以通过三种组件实现:MySQL,Redis,Zoookeeper。
使用Redis实现的分布式锁:高可用,高性能,利用锁的超时释放机制保障安全性,防止死锁。
5.4实现初版的分布式锁
5.4.1设计方案
获取锁:获取锁的时候使用SETNX lock 线程唯一标识,然后给锁进行设置时间,使用SETNX如果没有lock就会设置成功,有lock就会设置失败,在redis客户端如果设置成功就会返回,设置失败就会返回,在Java中使用Spring-Redis客户端进行操作的时候如果上锁成功就会返回true,上锁失败就会返回false。
释放锁:释放锁的时候可以支持手动释放锁,直接DEL key,也支持使用超时释放机制(通过给锁加时间限制实现)
但是上面使用SETNX设置锁,然后再使用EXPIRE给锁设置时间,这不是一个原子性操作,很有可能代码都执行完了,你还给它设置时间,会出现问题,而且这样也很麻烦,所以直接使用SET命令就可以,SET lock 线程唯一标识 NX EX time。
这样就保证了原子性,设置key-value并且设置了时间。
但是要注意一点,在Redis中,使用SET NX这种语法,如果设置成功就会返回OK,如果设置失败就会返回nil。
5.4.2代码实现
5.4.2.1定义通用接口
先进行定义一个通用接口,里面定义了tryLock尝试获取锁,unlock释放锁。
5.4.2.2小知识补充 => 包装类型拆箱时的注意点
IDEA在这里进行报黄警告了,为什么呢?因为方法如果返回的时一个包装类型,那么这个方法也很有可能进行返回一个null空指针,那就出现大问题了,拆箱的时候,空指针会抛出错误的!!!
看一下下面这个案例:
/*** 测试空指针拆箱*/
public class TestNull {public static void main(String[] args) {System.out.println(returnBool());}public static Boolean returnBoolean() {return null;}public static boolean returnBool() {return returnBoolean();}}
当我们尝试去拆箱返回打印的时候,JVM抛出了错误:
NullPointerException => 空指针异常!!!
正确的方式,调用Boolean.TRUE.equals(Boolean值)。
这样的话,如果返回的包装类型是null,也会被判定为false。
5.4.2.3实现完整代码
抓住几个重点说一下。
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;/*** 实现分布式锁工具*/
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(Long timeoutSec) {// 获取线程标识long threadId = Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
}
5.4.2.4为什么要进行设计让外部进行传入一个name和stringRedisTemplate进来?
传进来stringRedisTemplate主要是这个工具类没有交给SpringBoot进行托管,需要将StringRedisTemplate从外部注入进来,方便进行使用。
传入name主要是为了key进行使用的,主要是为了key的设计,这里key设计为lock:业务名称,为什么进行这样的设计呢?因为肯定不能所有业务用一把锁啊,这还得了?不同业务模块使用该工具类的时候,可以传入不同的业务名称,方便为不同的业务进行上锁。
5.4.2.5锁的设计
1.先分析一下这个命令的使用.
直接查他源码,opsForValue().setIfAbsent()的源码。
这个方法接收key,value, timeout,unit,可以在设置key-value的同时,设置key-value的过期时间,并且这个命令是走的redis指令的NX,返回值是一个Boolean包装对象,设置key-value成功就返回Boolean.TRUE,设置失败就返回Boolean.FALSE。
public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) {byte[] rawKey = this.rawKey(key);byte[] rawValue = this.rawValue(value);Expiration expiration = Expiration.from(timeout, unit);return (Boolean)this.execute((connection) -> {return connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent());}, true);
}
2.再探究一下这个value的设置,也是非常有意思
这个value设置为线程的ID了,目的是为了区分不同线程,目的主要是:当A线程执行的时候,如果A线程设置的锁超时释放了,其它的线程又进来了,如果线程A不进行判断就去将锁释放掉了,那不直接G了,你™释放的是自己的锁嘛?
所以要进行使用线程ID,让线程进行判断一下,这他妈的到底是你的锁吗?不是你的锁你释放你妈呢?你妈的,草泥马的是你的你再释放,不是你的你手脚老实点,别装B。
3.在分析一下KEY的设计
KEY的设计是lock:业务:userId,这样就可以将锁的粒度设置到每个用户啦,提高了并发度。
5.4.2.6锁的业务使用
也抓几个重点聊一下。
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 尝试加锁
boolean isLock = lock.tryLock(1200L);
// 处理加锁失败
if (!isLock) {return Result.fail("请求失败!!!");
}
try {// 处理加锁成功IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
} finally {// 释放锁对象lock.unlock();
}
5.4.2.7获取锁和释放锁
将加锁和处理加锁失败逻辑放在try-catch-finally外,只需要将处理业务的代码放在try-catch-finally中即可,无论业务执行成功与否,都可以进行使用unlock进行释放锁。
5.4.2.8不懂的现象
5.5思考分布式锁的问题是什么?
其实还是刚才分析果的问题,只是没有实现,当线程A因为不知道什么原因阻塞的时候,如果不加以判断这个锁是不是自己的,如果是自己才可以进行释放锁,不是自己的是无法进行释放锁的。
看一下新的业务逻辑:
其实就是在释放锁之前加入一个操作,判断一下锁的标识是不是自己的。
新的业务逻辑,就不用担心被其它线程把自己的锁解除啦!!!
5.6改进Redis分布式锁
5.6.1Redis分布式锁的改进
存入线程标识的设计 => 这个线程标识必须要是可以在不同的JVM之间进行区分的,这样才能做到分布式判断,因为不同的JVM生成的线程ID(Thread.currentThread().getId())可能会是相同的,所以这里进行使用UUID进行实现生成一个随机的ID,防止重复。
标识的判断 => 释放锁前得到lock对应的value,如果一致就释放锁,不一致就不释放锁。
5.6.2代码实现改进
我们还是抓几个重点进行阐述一下。
/*** 实现分布式锁工具*/
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";@Overridepublic boolean tryLock(Long timeoutSec) {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 获取到线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取到lock对应的valueString value = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 释放锁if (threadId.equals(value)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
5.6.3线程唯一标识的生成
为了保障分布式锁可以在多机JVM实现互斥的效果,最主要的是保障线程生成的唯一标识可以让多个JVM不会同时生成相同的标识,最终就能保证锁不会被其它线程给解除。
主要是进行使用UUID生成一个分布式ID,可以在一定程度上保障UUID不会生成重复的ID。
private final String ID_PREFIX = UUID.randomUUID().toString() + "-";
进行使用生成的UUID前缀+ThreadID进行生成唯一标识,作为分布式锁的value。
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
5.6.4释放锁的改造
在delete释放锁之前,我们需要进行使用进行对比一下当前线程的唯一标识和redis中存储的线程标识是否相等,因为目前这个前缀是一个成员变量,所以每个线程进行创建这个对象的时候,这个UUID都是和线程强绑定且唯一的。
先获取当前线程标识 => 获取redis中存储的锁对应的线程唯一标识 => 解锁。
@Override
public void unlock() {// 获取到线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取到lock对应的valueString value = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 释放锁if (threadId.equals(value)) {stringRedisTemplate.delete(KEY_PREFIX + name);}
}
5.7思考解锁时的问题
解锁时是两个操作redis的步骤的,如果在获取redis中存储的线程标识后,因为各种原因(例如垃圾回收器Stop The World,单机并发太猛导致CPU没有分配时间片,redis连接阻塞)都有可能导致线程阻塞,最终导致进行第二步操作的时候,还以为锁是自己的,就直接给解开了,所以我们需要保证整个解锁的过程都是原子性的,解锁你就直接执行完就可以了,不要分步骤去执行。
5.8Redis的Lua脚本
5.8.1什么是Redis的Lua脚本
Redis中是通过Lua脚本来进行保障Redis中多条指令的执行原子性的。
Redis中提供了Redis.call()调用函数去作为Lua脚本中进行调用执行Redis中的命令。
5.8.2.1无参EVAL
无参EVAL的使用就是EVAL "Lua脚本" 0 => 这个指令执行的就是一个无参数写死的LUA脚本指令。
5.8.2.2有参EVAL
有参EVAL的使用EVAL "Lua脚本" key的参数数量 key的参数 argv的参数,key的参数数量主要是进行判断后面参数列表中有多少个是key的参数,多少个是argv的参数。
key的参数使用KEYS[index],argv的参数使用ARGV[index],index从0开始。
有参EVAL可以进行接收参数,提高了Lua脚本的灵活性,不是一个写死的脚本文件。
5.9使用Lua脚本实现分布式锁
5.9.1Lua脚本的业务逻辑
使用Lua脚本使用原子判断 => 获取锁中的线程标识,判断标识是否一致,不一致则释放锁,不一致就什么都不做。
5.9.2RedisTemplate如何调用Lua脚本?
使用RedisTemplate的execute进行执行Lua脚本。
5.9.3Lua脚本原子性释放锁的实现
5.9.3.1编写Lua脚本
进行编写Lua脚本,Lua脚本接收一个KEY参数(接收分布式锁的KEY),接受一个普通参数用于对比(接收的是当前的线程标识),如果是当前线程去解除锁,就进行执行删除Key的指令。
--- 比较线程标识于锁中标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1])
end
return 0
5.9.3.2实现调用Lua脚本释放锁的逻辑
我们还是找几个重要的点说一下吧。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}@Override
public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
5.9.3.3静态资源的预加载
思考一下,什么时候去加载lua脚本合适呢?是在解锁的时候进行使用吗?当然不是,如果在unlock函数中去进行加载Lua脚本资源的话,就太扯淡了,这样每次unlock都得加载一遍,既浪费时间,又重复加载浪费性能,占用内存。
这个Lua脚本文件最适合的就是,在类创建的时候进行加载Lua脚本资源,这样就可以保证在每个JVM上,这个文件可以只进行加载一次,并且这个类是被ApplicationClassLoader进行加载的,是被GC Root永远强引用的,不会被垃圾回收,可以永远被调用,一次加载多次使用。
我们看一下RedisTemplate的execute函数的源码:
很容易我们就能发现,这个函数接收的是RedisScript类型的脚本文件,第二个参数是接收一个集合作为KEYS的参数,第三个参数是一个剩余参数,可以接收多个变量,作为Argv的参数。
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {return this.scriptExecutor.execute(script, keys, args);
}
我们先来进行构造RedisScript对象,这个对象需要进行构造加载脚本的位置和返回值类型,需要构造的很复杂,所以这种复杂的对象构造,建议在代码块中进行执行初始化,又因为构造的是static静态变量,所以我们在static代码块中进行执行初始化操作。
先创建一个DefaultRedisScript对象,然后使用ClassPathResource加载Lua脚本(文件这方面我是真的弱鸡),然后再进行设置返回值。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}
5.9.3.4Lua脚本执行原子指令
第二个参数也是设置的很有意思,使用的是Collections工具类提供的singletonList函数,将其中的一个元素转换为List。
这个脚本执行是原子性的,不会出现以前的那种问题。
@Override
public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
5.10Redis原生分布式锁的实现思路
set nx ex获取锁 => 设置过期时间(防止死锁),保存线程标识(防止其他线程解除我这个线程的锁,导致多线程重入)
set nx => 乐观锁的特性保证了互斥性。
set ex => 锁超时释放,防止死锁。
Redis集群保障高可用性。
但是怎么设置合理的锁超时释放时间呢?
6.Redisson
6.1SETNX分布式锁的问题所在
setnx实现分布式锁存在的问题:
不可重复:无法使用单独的redis指令实现单一线程重入锁。
不可重试:获取锁只能尝试一次,就返回,没有重试机制(其实可以进行使用自旋CAS实现的)
超时释放:会出现锁超时释放时间设计不合理的问题,还是会导致多个线程一起执行,还是会出现一个人多次下单的问题。
主从一致性:Redis集群主从同步存在延迟,如果不进行优化就会出现问题。
6.2初识Redisson
6.2.1什么是Redisson
Redisson其实就是一个基于Redis实现的JUC并发包,实现了JUC中的数据结构,一个分布式的JUC并发工具。
6.2.2Redisson入门配置
需要进行引入依赖和配置Redisson客户端。
测试Redisson的分布式锁:
6.3Redisson可重入锁原理
6.3.1Redisson的可重入锁
Redisson是支持可重入锁的,获取锁是通过计数进行实现,当第一次进行加锁的时候,也可以进行判断锁是不是自己的,原理是什么呢?是通过锁计数进行实现的,其锁的数据结构是一个hash结构,hash中有field和value相关字段。
先看一下整个锁的结构是什么样的,这样才能理解Redisson是如果实现的锁呢?
这个value的结构是一个Hash结构,其中有hash结构中存储了一个键值对,其中key进行存储了线程唯一标识,用于判断这个锁是属于哪个线程的,value存储的是锁计数,用于判断这个线程可以被重入几次,第一次上锁存储的就是1,以后再进行加锁就会继续自增在value上,释放锁的时候就是自减1,当锁计数自减到0的时候就会将锁释放掉。
6.3.2Redisson实现可重入锁的原理
Redisson实现可重入锁的原理是借助的Lua脚本,判断+加锁的过程是一个原子性的,来梳理一下整个流程。
获取锁的Lua脚本:key = KEYS[1]获取到锁key,然后在ARGV数组进行获取到threadId(线程唯一标识 => 主要是进行判断是不是自己的锁),还可以进行获取到锁的自动释放时间(主要是进行设置锁的时间的)。
整个流程
1.进行判断是否存在锁是否存在,如果不存在就进行获取锁和设置有效期,返回1。
2.如果锁已经存在了,就去判断一下锁线程标识,看看锁是不是自己持有的(标识是和线程进行绑定),如果标识是自己的就重入次数+1并设置有效期。
3.如果线程标识判断不是属于自己的锁,就不进行加锁,返回加锁失败。
6.4Redisson分布式锁原理
今天我们来梳理一下整个Redisson分布式锁的原理,分步骤地去研究一下Redisson锁的分布式原理。
6.5.1几个比较重要的点
Redisson的tryLock接收三个参数,waitTime => 获取锁等待时间,leaseTime => 锁释放时间,unit => 时间单位。
在源码中ttl如为null就意味着获取锁成功,如果获取锁失败ttl就会进行存储waitTime剩余等待时间。
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
6.5.2超时等待机制 - 锁重试机制
关键字:发布订阅,关键节点判断。
超时等待机制是Redisson的特性,是我们进行使用set nx ex无法进行实现的,开始获取锁之后就进行去判断是否获取锁成功,如果成功就会将锁返回null并赋值给ttl,如果获取锁失败就会将剩余等待时间赋值给ttl,进入超时等待机制。
先判断ttl(剩余时间)是否大于0,如果已经等够了,妈的要死了,就返回false,滚球了。
如果还没等够,就去进行使用发布订阅机制,去订阅持锁线程的释放锁事件进行等待。
当持锁线程释放锁成功后就会发布事件,这是一个广播机制,广播机制会通知所有订阅了事件的线程,重新去抢锁。
被事件叫醒了之后先去判断一下是否超时了,超时了就去返回一个false,没超时就回到一开始去尝试获取锁(使用的是递归)。
6.5.3看门狗机制
Redisson使用WatchDog看门狗机制进行实现的永久锁,WatchDog看门狗机制就是利用的续期机制,当锁快过期的时候进行一波续期,保证了锁永远不会过期。
如果获取锁成功啦,就会进入下一个关键节点,判断锁的释放时间,如果存在释放释放时间,就直接给锁设置释放时间,进行返回即可true,如果锁释放时间为-1(即为永久),就去启动看门狗机制,然后返回true,使用看门狗机制实现永久锁。
看门狗只会在获取到锁之后才能进行开启,在释放锁的的时候会将看门狗关闭。
6.5.3释放锁机制
释放锁的时候会去通知所有订阅锁事件的线程去抢锁,并且会取消看门狗机制(如果是永久锁就会取消)
6.5.4Redisson分布式锁原理
6.5Redisson分布式锁主从一致问题
6.5.1问题再现
这个问题主要在Redis集群中出现,Redis集群会进行搭建主从节点,主节点负责进行写入,从节点负责进行读取,主节点和从节点进行主从同步,主节点将写入的内容同步到从节点。
但是这个问题主要是出现在,当主节点失效后,从节点晋级为主节点之后,如果此时Java应用启动的分布式锁在已经宕机掉的Redis中还没有同步到从节点中,就会出现锁失效现象,导致锁依然可能被重入。
6.5.2问题解决 => multiLock
这个问题使用multiLock就可以进行解决,redis集群搭建采用多集群方式,多个主节点,Java应用获取锁的时候同时向多集群进行上锁,释放锁的时候同时释放多集群的锁,只有多集群的锁获取/释放成功,才算获取/释放锁成功。
这个了解即可,用到多集群的时候再研究。
6.6三种锁总结
6.6.1分布式锁的缺点
缺点很明显,就是超时时间你根本设置不明白,你到底怎么设置一个合理时间呢?像黑马一样使用Redisson的WatchDog机制?如果设置一个永久锁,其实确实可以在一定程度上解决这个问题,但是这也会导致如果用户请求线程出现问题,用户出现问题的线程会一直卡住,锁一直不过期,也会导致用户的使用出现问题,用户再去请求的时候都会因为卡住的线程没有释放锁而导致后续请求全部失败。如果不适用看门狗机制,线程出现问题之后,一直卡住,如果此时锁被超时释放了,那肯定就会出现一个人重入的情况,无法保证一人一单,所以我不建议适用分布式锁。
6.6.2不可重入Redis分布式锁
7.秒杀优化
7.1Redis同步秒杀的问题 => 长耗时问题
7.1.1分析长耗时问题
为什么会出现长耗时的问题:主要是整个流程是一个同步的历程,并且涉及到大量的数据库操作,还有大量加锁解锁的操作,所以耗时操作比较多,所以整体存在长耗时问题。
查询优惠券(数据库操作)=> 判断秒杀库存 => 分布式锁加锁(Redis操作) => 查询订单(数据库操作)=> 校验一人一单 => 扣减库存(数据库操作)=> 创建订单(数据库操作)=> 分布式开锁(Redis操作)
7.1.2测试长耗时接口
虽然这个长接口仅仅有23ms,但是虽然并发量的提高和网络部署(目前网络IO都在本地,速度很快),网络IO成本和系统响应成本提高之后,就会导致整体接口的耗时增加,预计会达到上百ms,说明这是一个长耗时接口。
7.2Redis优化秒杀思路
7.2.1架构分析
主要是进行将判断秒杀库存和校验一人一单的功能进行放入到Redis中,Redis进行校验完之后,就进行优惠券ID,用户ID,Redis生成的ID存入到阻塞队列中,调度线程池去阻塞队列中读取任务数据,进行创建订单操作(反正只要Redis校验成功之后一定会抢单成功)
7.2.2整体业务逻辑
1.新增秒杀优惠券的同时 => 在优惠券添加的同时,将优惠券信息保存到Redis中。
2.基于Lua脚本,判断秒杀库存,一人一单,决定用户是否抢购成功,抢购成功再进行扣减库存(校验功能最好一次性完成)
3.如果抢购成功 => 将优惠券ID,用户ID,订单ID(使用Redis生成)封装到阻塞队列中。
4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能。
7.3实现存入校验数据&Lua脚本校验&异步下单
7.3.1实现添加优惠券时存入数据
KEY的设计 => seckill:stock:优惠券ID,Value => 优惠券库存值。
在新增优惠券的时候,将优惠券ID和stock库存存储到Redis中。
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
7.3.2Redis校验Lua脚本
7.3.2.1为什么使用Lua脚本
Lua脚本主要是进行校验和扣减库存的,这个步骤是一个要多次操作的步骤,为了防止各种出错同时也为了减少网络IO(如果使用多次redis获取校验操作必须加分布式锁而且要多次网络IO,太麻烦了),直接进行使用Lua脚本,就可以完成这个一次性操作。
库存的校验是通过key-value存储了一个数据。
使用一个名为order的List进行存储用户ID,下过单的用户都会存储List中,下单成功就将用户加入到这个List,下单前去这个List中查询是否下过单,进行保证一人一单。
7.3.2.2业务流程
1.先进行判断库存是否存储,是就进入判断用户是否下单,否就结束并返回1。
2.再进行判断用户是否下单,否就进行扣减库存添加任务,否就结束进行返回2。
3.然后通过前两个校验,进行扣减库存,添加任务,返回0。
7.3.2.3编写Lua脚本
使用Lua脚本进行做校验库存和下单用户以及扣减库存和保存下单用户的操作。
-- 1.获取参数
-- 1.1 优惠券ID
local voucherId = ARGV[1]
-- 1.2 用户ID
local userId = ARGV[2]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. userId-- 3. 脚本业务
-- 3.1 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足, 返回1return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey)) then-- 3.3 存在,说明是重复下单,返回2return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下单 (保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)return 0
7.3.3准备阻塞队列和线程池任务
设计阻塞队列主要进行存储任务,设计线程池主要是为了处理任务。
阻塞队列就是用来进行存储任务的容器,可以在拿任务和装任务的时候产生阻塞的效果。
线程池主要是进行复用线程资源,将线程创建调用,管理的这些细节交给线程池处理。
private static BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();// 2. 创建订单handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.error("处理订单出错, time: {}", System.currentTimeMillis());}}}
}
7.3.4吐槽黑马垃圾代码
黑马为了模拟场景而写的代码属实垃圾,首先,我先吐槽一下它这个垃圾组合,ArrayBlockingQueue配合Executors.newSingleThreadExecutor(),什么垃圾组合,干啥呢?还在Bean创建的回调的时候将任务提交给队列。
去思考一下整个逻辑是在干什么:业务逻辑是在做什么?是为了异步创建订单。怎么达到异步化?开一个新线程进行处理。怎么让开启的异步线程更容易进行管理?使用线程池。任务如何持久化呢?存储到阻塞队列中。
黑马怎么做的:定义了一个死循环任务,开启了一个单线程的线程池:Executors.newSingleThreadExecutor(),定义了一个阻塞队列ArrayBlockingQueue去存储任务,在Bean被加载的时候,就进行将这个死循环任务提交给SingleThreadExecutor进行执行,由于take会导致阻塞,所以说会一直卡那里,就不会一直占用CPU资源了。
问题:1.我不知道黑马怎么想的,整一个线程的线程池在这里扯淡,如果突然高并发抢单,你一个线程得执行多久呢?订单创建延迟又有多久呢?能保证准时顶点下单吗?效率是不是很低呢?2.第二个就是,我实在是想不明白,线程池里可以定义阻塞队列,他完全可以在Redis执行完Lua脚本进行返回之后,去用线程池去执行任务,如果有空闲线程就立刻执行,如果没有空闲线程就放在线程池中定义的阻塞队列中,线程池中的阻塞队列放不下了,临时线程也没了,完全可以触发RejectedTaskException,调度任务拒绝策略,它现在是在干啥呢?
总结就是黑马为了讲Redis中Stream实现消息队列,而硬憋出来的垃圾代码,狗屎就是。
7.3.5完成RedisLua脚本执行完后存入数据
发现进行执行完Lua脚本通过校验了之后,就代表有购买资格了,有购买资格之后,调用Redis去生成订单ID,然后新建一个任务对象(使用优惠券对象代替),将创建的任务对象放入到阻塞队列中。
@Override
public Result seckillVoucher(Long voucherId) {if (proxy == null) {proxy = (IVoucherOrderService) AopContext.currentProxy();}// 获取用户Long userId = UserHolder.getUser().getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());// 2.判断结果是0int r = result.intValue();if (r != 0) {// 2.1 不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}// 2.2 为0, 有购买资格, 把下单信息保存到阻塞队列VoucherOrder voucherOrder = new VoucherOrder();// 2.3 生成订单IDlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.4 用户IDvoucherOrder.setUserId(userId);// 2.5 代金券IDvoucherOrder.setVoucherId(voucherId);// 2.6 放入阻塞队列orderTasks.add(voucherOrder);return Result.ok(orderId);
}
7.3.6处理订单的逻辑
其实处理订单的逻辑没太多好说的,主要从几个点进行分析一下:1.为什么加锁?2.调用
// 处理订单方法
private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1. 获取并设置用户IDLong userId = voucherOrder.getUserId();// 2. 创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);// 3. 获取锁boolean isLock = lock.tryLock();// 4. 判断是否获取锁成功if (!isLock) {// 获取锁失败, 返回错误或者重试log.error("不允许重复下单");return;}try {proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}
}·
8.秒杀最终优化 => Redis消息队列
8.1Redis消息队列
消息队列:存放消息的队列,可以进行管理消息,一般被叫做消息代理。
生产者:发送消息到消息队列。
消费者:从消息队列中获取消息并处理。
在当前秒杀业务中,生产者主要是负责创建订单任务,并将任务提交到任务队列。
生产者创建消息的过程是这样的:判断秒杀时间和库存 => 校验一人一单 => 使用优惠券ID,用户ID,订单ID创建任务,将任务作为消息发送给Message Queue。
消费者负责去消息队列中接收消息(任务),并进行相应的处理。
Redis:
1.List结构:基于List结构模拟消息队列。
2.PubSub:基本的点对点的消息队列模型。
3.Stream:Redis官方实现的比较完善的消息队列模型。
8.2基于List模拟消息队列
8.2.1为什么List可以模拟消息队列
消息队列:存放消息的队列。
List的特性:双向链表,可以进行模拟出入队列的效果。
List模拟消息队列:1.LPUSH和RPOP,左入右出。2.RPUSH和LPOP ,右进左出。但是没有消息阻塞的模式哦,如果取出数据的时候,队列中没有数据,就会返回一个null,并不会出现JVM阻塞队列的效果。
阻塞式进行取出数据可以进行使用BRPOP/BLPOP,这样都是阻塞式的API。
8.2.2List模拟消息队列的优点
1.使用Redis存储,不占用JVM的内存 => 我感觉这个怎么说呢,也是一个问题啊,不占用JVM内存,但是占用Redis的内存啊,这样也会右一定的限制问题。
2.List具有Redis的持久化机制,数据安全有保障,利用Redis的持久化机制,即使整个系统宕机了,也可以存储下来宕机前的数据,在机器恢复启动的时候,数据不会丢失,会恢复。这样就保障了消息不会丢失。
3.可以保证消息的有序性 => 链表可以遵循FIFO的规则,也可以遵循LIFO的规则,反正就是可以保证有序。
8.2.3List模拟消息队列的缺点
最大的问题就是,无法避免消息的丢失,比如说如果从队列中进行取出数据,在取出数据后,突然宕机了,那么消息就随着JVM的宕机而丢失了,因为取出消息是remove And get,取出成功后消息也就随之丢失了。
List无法实现一条消息被很多消费者消费的情况,一条消息仅仅支持一个消费者进行消费。
8.3基于PubSub(发布订阅)的消息队列
8.3.1为什么PubSub适合作为消息队列?
PubSub(发布订阅):消费者可以订阅一个或者多个channel(也就是可以进行同时订阅多个生产者的消息管道),生产者向消息管道进行Pub消息之后,所有订阅这些Channel的消费者都可以收到消息。
使用PubSub机制可以同时让多个消费者接收到消息,生产者发布消息 => 多个监听channel的消费者就可以同时接收到消息。
PubSub(发布订阅)API:
1.SUBSCRIBE(subscribe)channel [channel] => 订阅一个或者多个频道。
2.PUBLISH(publish)channel msg => 向一个channel进行发布消息。
3.PSUBSCRIBE(psubscribe)pattern[pattern] => 订阅与pattern格式匹配的所有频道。
8.3.2进行测试PubSub(发布订阅)模式
进行测试subscribe订阅channel。
进行测试publish发布消息到channnel。
8.3.2基于PubSub的消息队列的优缺点
8.3.2.1PubSub的优点
采用发布订阅模型,支持一个channel对应多个生产者和消费者。
8.3.2.2PubSub的缺点
不支持数据持久化(因为发布之后,如果订阅者不去及时接收消息,宕机了之后,就会造成消息丢失)
消息发布后,没有及时去接收,就会出现消息丢失。
消息堆积有上限,因为如果消费者同时去监听了多个channel,多个channel同时发布消息的时候,生产者只能一个一个消息的接收,在这期间没有进行接收的消息是堆压在消费者的缓存处的,缓存总有一天会满的,满了之后超时没有获取到,就会出现丢失消息的现象。
8.4基于Strean的消息队列
8.4.1为什么使用Stream作为消息队列
Stream是Redis5.0引入的一种新的数据结构,用于进行实现一个功能相对完善的消息队列。
官方也建议使用Stream作为消息队列进行使用。
8.4.2发布消息的命令
Stream使用XADD进行发布消息。
看一下XADD的用法:
XADD key(Stream的名称) [NOMKSTREAM(如果队列不存在,是否进行自动创建,默认是自动创建) MAXLEN(设置消息队列的最大消息数量,可以使用*标识,*代表的就是这个队列是无限长度的) [消息的唯一ID](默认由Redis进行自动生成,格式是时间戳-递增数字,如果进行自定义,建议是一个独一无二不会重复的值) 队列中的消息(消息存储的是Enrty,格式就是多个key-value键值对)
一般来说NOMKSTREAM都不用进行设置,不存在直接进行自动创建就行。
消息唯一ID直接让Redis进行按默认方案生成就行了。
所以一般来说都是使用XADD user * name jack age 21,这种最简版的方式进行创建即可。
测试进行添加消息:
使用指令XADD key(指定是添加到哪个队列中) * 消息键值对。
查看添加到系统中的数据:
8.4.2.1读取消息的方式一 => XREAD非阻塞式
读取消息的基本方式 => 使用XREAD非阻塞式读取的方式。
XREAD [COUNT count(每次进行读取消息的数量最大值)] [BLOCK milliseconds(当没有消息的时候,是否阻塞,阻塞的时间是多久,单位为毫秒)] [STREAMS key [key...](从哪个队列中读取消息,key就是队列名)] [ID []ID ...](起始ID,只返回大于该ID的消息,0代表从第一个消息开始,$代表从最新的消息开始)]
基本上非阻塞式读取使用的指令就是:
1.从第一条数据开始读取:XREAD COUNT count SRTEAMS key(当然也可以) 0
2.从最新的数据源开始读取(只能读取未读过的):XREAD COUNT count STREAMS key $
要点:使用$进行读取最新的数据必须要配合阻塞机制才能实现。
进行读取user队列中哪第一条数据:
读取出来的的数据为:Stream队列名称,消息ID,消息的key和value。
可以发现Stream中的数据是可以重复读取的:
非阻塞状态下使用XREAD + $读取最新的数据,会读不到数据,返回nil:
8.4.2.2读取消息的方式二 => XREAD阻塞式
读取消息的基本方式二:通过XREAD阻塞式读取数据。
Redis命令:XREAD COUNT count BLOCK millisecounds(阻塞毫秒值) STREAMS key id/$/0。
经常使用的就是使用阻塞式读取的方式,尝试去读取Stream消息队列中最新的数据:XREAD COUNT count BLOCK millisecounds STREAMS key $
进行使用阻塞等待十秒方式:XREAD COUNT count BLOCK 100000 STREAMS key $
在阻塞期间,只要STREAM队列中有消息进入,就会接受到消息。
要点:将ID设置为$进行获取最新数据的方式,只能通过XREAD的阻塞读取的方式才可行。
8.4.2.3JAVA代码实现XREAD阻塞读取数据的不可行方式
使用循环阻塞的方式去取消息,保证每次取到的消息都是最新的消息。
但是这种方式有很大的问题,如果redis执行指令获取到消息之后,在处理消息的过程中,来了新的消息,代码处理完消息,使用这行redis指令的时候,就取不到刚刚来的消息了。
private static class Redis {public Object execute(String order) {System.out.println(order);return "执行成功";}
}@Test
void testXRead() {Redis redis = new Redis();while (true) {// 尝试读取队列中通的消息, 最多阻塞2秒Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");if (msg == null) {continue;}System.out.println("处理消息");}
8.4.2.4STREAM类型消息队列的XREAD命令特点
1.消息可回溯(STREAM中的消息,使用XREAD读取可以多次读取)
2.一个消息可以被多个消费者读取(一条消息可以被读取多次)
3.可以被阻塞读取(XREAD提供了阻塞读取的命令)
4.有消息漏镀的情况(使用循环阻塞读取最新消息,处理的时候,可能会存在消息漏镀的情况)
8.5基于Stream消费者组实现消息读取
8.5.1消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
8.5.1.1消息分流
队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息的处理速度。
要点:消息分流给多个消费者,加快消息处理速度。
8.5.1.2消息标识
消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费。
要点:消费者组维护了一个标识,标识出队列中最后一个消息处理到什么位置了,保障消息都会被处理。
8.5.1.2消息确认
消费者获取到消息后,消息处于pending状态(待确认的状态),并存入到pending-list中(等待队列中)。当处理完成之后需要通过XACK来进行确认消息,标记该消息已经被处理,才会从pending-list移除。
要点:增加了消息确认机制和消息待确认队列,保证消息一定会被处理。
8.5.2消费者组相关的API
消费者组的相关API其实只有一个指令,就是XGROUP。
看一下整个XGROUP指令:
XGROUP [CREATE KEY groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTORY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
XGROUP消费者组指令可以进行创建/删除消费者组,从消费者组中添加/删除消费者。
8.5.2.1创建消费者组
创建消费者组:
XGROUP CREATE key groupName ID|$ MKSTREAM
key:STREAM队列名称
groupname:消费者组名称
ID:起始ID标识,$代表队列中最后一个消息(即最新的消息),0代表的是队列中的最后一个消息。
MKSTREAM代表的意思就是,如果Stream不存在,就会创建出来一个Stream消息队列进行使用,如果不添加这个参数,就会抛出错误。
要点:消费者组是基于STREAM队列中的数据建立的消费者联盟,多个消费者同时去监听同一个STREAM消息队列。
8.5.2.2删除消费者组
删除消费者组:
XGROUP DESTORY key groupName
删除key对应的Stream消息队列中的名字为groupName的消费者组。
8.5.2.3给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
为指定Key对应的Stream中的名字为groupName的消费者组,添加名字为consumerName的消费者。
要点:使用这个指令的时候就会自动创建消费者。
8.5.2.4删除消费者组中指定的消费者
XGROUP DELCONSUMER key groupName consumerName
为指定Key对应的Strea中名字为groupName的消费者组,删除名字为consumerName的消费者。
8.5.2.5从消费者组读取数据
XREADGROUP GROUP group consumer [Count count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费者组名称
consumer:消费者名称,如果消费者不存在,就会自动创建一个消费者
count:本次查询的最大数量(限制查询数量)
BLOCK milliseconds:当没有消息的时候,最长阻塞时间。
NOACK:无需手动ACK,获取到消息后自动确认。
STREAMS key:指定队列名称(从哪个Stream队列中的哪个消费者组中进行读取数据)
ID:获取消息的起始ID:
">":从下一个未消费的消息开始。
"其它":根据指定的ID从pending-list(待确认的消息队列)获取已笑给但并未确认的消息,例如:0。是从pending-list中的第一个消息开始。
8.5.2.6确认消息
确认消息使用的是XACK key group ID [ID...]
key代表的就是对应的Steam消息队列。
group代表的就是对应的消息队列中的消费者组。
ID就是要被确认的消息的ID。
8.5.2.7获取消费者组中对应的未确认的消息
获取未确认的消息的指令就是使用的是XPENDING group [[IDLE min-idle-time] start end count [consumer]]
group代表的就是对应的消费者组。
IDLE就是进行指定进入pending-list中的时间范围
start就是指定开始位置 -:代表的是最开始的一个数据
end就是指定结束位置 +:代表的是最后一个数据
count就是指定读取数据的数量
consumer就是指定消费者
8.5.3消费者组的实战操作
8.5.3.1初始化一个Stream队列
使用XADD指令进行初始化一个Stream队列。
8.5.3.2进行创建一个消费者组
使用XGROUP CREATE s1 g1 0 => 创建一个关于s1的Stream队列,从第一个消息开始读取的消费者组。
8.5.3.3进行使用消费者组读取指定消息队列中的数据
使用指令:
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
使用消费者组g1中的名称为c1的消费者去阻塞式的读取名称为s1的STREAMS中的数据。
使用指令:
XREADGROUP GROUP g1 c2 COUNT 1 BLOCK 2000 STREAMS s1 >
使用消费者组g1中的名称为c2的消费者去阻塞式的读取名称为s1的STREAMS中的数据。
8.5.3.4进行确认消费者组消费的消息
XACK s1 g1 ID... => 进行确认Stream队列名称为s1,消费者组名称为g1,消息ID为...的消息进行确认。
确认消息后,会将确认成功的消息数量返回。
8.5.3.5进行查看队列中所有未确认的消息
使用XPENDING s1 g1 - + 10查看消息队列名称为s1,消费者组为g1中的PendingList中的数据。
输出的数据是消息ID,消费者名称,最后一次被读取截止到当前的时间,被消费者读取的次数。
8.5.3.6从待确认队列(PendingList)中读取数据
使用XREADGROUP GROUP g1 c2 COUN 1 BLOCK 2000 STREAMS s1 0
这句话的意思是,从pending-list中读取,Stream为s1,消费者组为g1,消费者为c2的未确认的数据。
发现将消息ACK确认之后,就从pending-list中移除了·。
8.5.4使用JAVA伪代码实现消费者组的消息消费
8.5.4.1前置准备
一个Redis伪类,主要是用于进行模拟Redis发送指令。
private static class Redis {public Object call(String order) {System.out.println("执行: " + order + ", 成功。");return "我是查询的消息数据";}
}
一个定义的异常,主要是当处理消息的函数没有正常的去ACK消息时,抛出这个异常,告诉上层要进行的处理这个异常(例如从PendingList中读取未确认的消息,再次去处理)
// 处理消息时出现异常抛出的异常
private static class HandlerMsgException extends RuntimeException {public HandlerMsgException() {super();}public HandlerMsgException(String msg) {super(msg);}
}
定义的处理消息的函数:
// 处理消息的函数
private void handleMessage(Object msg) throws HandlerMsgException {double random = Math.random() * 10;if (random < 5) {System.out.println(msg);} else {throw new HandlerMsgException((String) msg);}
}
8.5.4.2测试去队列中取出数据
1.先调用XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMES s1 >从Stream消息队列中获取数据。
2.判断如果拿不到数据怎么办,其实可以线程先等待一会,然后再去自旋继续去队列中获取数据。
3.进行调用处理消息的函数handlerMessage
4.如果抛出了未正常ACK消息的HandlerMsgException异常,就进行处理异常。
5.循环调用XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0,从待确认队列(pending-list)中取出消息数据,再去处理,如果不行,就继续自旋去待确认的队列中取数据进行处理。
@Test
void testXReadGroup() {Redis redis = new Redis();while (true) {// 尝试监听队列, 使用阻塞模式, 最长等待 2000 毫秒Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");// 压根拿不到数据时怎么办if (msg == null) {continue;}try {// 处理消息的函数 处理完消息一定要进行ACKhandleMessage(msg);} catch (HandlerMsgException e) {while (true) {// 尝试从pending-list中取出数据msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0");// null就代表没有异常消息了, 所有消息都已经被确认了if (msg == null) {break;}try {// 说明还有没有确认的消息 再次进行处理handleMessage(msg);} catch (HandlerMsgException exception) {// 再次出现异常, 记录日志, 空袭西门家吗log.error("处理未ACK的数据失败", exception);}}}}
}
8.5.4.3消费者组XREADGROUP命令特点
1.消息可回溯 => 消息不会从Stream中移除。
2.可以多消费者(一个消费者组中有多个消费者)争抢消息,加快消息的处理速度。
3.可以阻塞读取 => 支持使用BLOCK阻塞读取。
4.没有消息漏读的风险 => 存在消息标记,会记录上一次处理的消息,宕机重启后也会借助持久化机制恢复消息标记。
5.有消息确认机制,保证消息至少被消费一次(提供了pending-list待确认消息队列和ACK消息确认机制,保证消息至少被消费一次)
8.6基于Redis的Stream消息队列优化秒杀
8.6.1需求分析
1.创建一个Stream类型的消息队列,名称为stream.orders。
由于这个消息队列是一行创建的产物,队列不会进行多次创建,所以直接在Redis客户端提前创建好即可,无需放在Java代码中再进行创建了。
2.修改之前的秒杀下单Lua脚本,在认定有抢购资格之后,直接向stream.orders中添加消息,内容包括vouvherId,userId,orderId。
以前由于使用的是Java的消息队列,需要认定有抢购资格后,通过Lua脚本将锁解开后,最终在Java代码中向任务队列投递任务对象。
但是由于Redis消息队列的特性,我们完全可以将创建任务对象,并向消息队列中投递任务的这个过程,直接放在Lua脚本中进行,这样就减少了一次Java代码和Redis的交互(因为如果任务对象在Java代码中进行封装,然后在Java代码中将任务对象进行提交到Redis的Stream消息队列中,这样就增加了一次与Redis的网络交互,如果直接在Lua脚本中进行运行,那么一次网络交互就可以了)
8.7.创建Redis中的Stream消息队列
XGROUP CREATE stream.orders g1 0 MKSTREAM进行为名字为stream.orders的消息队列创建名为g1的消费者组。