【从零开始学习Redis】如何设计一个秒杀业务
优惠券秒杀
全局唯一ID
当用户抢购时, 就会生成订单并保存到tb_voucher_order这张表中, 而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
这里使用到了全局ID生成器
ID的组成部分:
- 符号位:1bit, 永远为0
- 时间戳:31bit,以秒为单位, 可以使用69年
- 序列号:32bit, 秒内的计数器, 支持每秒产生2^32个不同ID
全局唯一ID生成策略:
- UUID
- Redis自增
- snowflake算法
- 数据库自增
Redis自增ID策略:
- 每天一个key, 方便统计订单量
- ID构造是 时间戳 + 计数器
@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1672531200L;/*** 序列位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix){// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timeStamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 自增long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回return timeStamp << COUNT_BITS | count;}}
实现秒杀下单
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
实现类代码如下:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 根据优惠券id查询秒杀信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 判断当前时间是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 判断当前时间是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已结束return Result.fail("秒杀已经结束!");}// 查询优惠券是否还有库存if (voucher.getStock() < 1) {return Result.fail("库存不足!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!success) {return Result.fail("库存不足");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 1.添加订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.下单用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 3.优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 返回订单idreturn Result.ok(orderId);}
}
库存超卖问题
假设当前库存为1,在高并发的情况下,如果同时查询到当前还有库存,都会执行扣减库存的操作。其实这就是常见的线程安全问题。我们首先想到的就是用锁,但是具体要怎么做呢?
悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
- 例如Synchronized、Lock都属于悲观锁
乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
- 如果没有修改则认为是安全的,自己才更新数据。
- 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁的关键是判断之前查询到的数据有没有被修改,常用的方法有两种:
版本号法:
每次扣减库存完成时,version都要同步更新,另一个线程只要当前version和之前查询到的不一致,那么不会扣减库存。
CAS法:
CAS法是基于版本号法的简化版本,因为每次执行扣减库存都要更新版本号,那么可以直接用库存的数量代替version,如果某个线程在执行扣减库存时,当前stock与查询库存时的值一致,就说明没有其他线程修改当前数据。
但是乐观锁的方法会导致失败率大大提高,可能达到90%的失败率。
原因还是在于同时大量请求抢购优惠券,但是在第一个线程修改了库存之后,后面的线程发现当前库存与自己查询得到的不一致,所以会认为自己“多买了”,抢购就会失败,从而导致大量请求失败。
我们的优化思路也很简单,将扣减库存的业务逻辑改为如果库存大于等于0就可以抢购到。这样就可以保证避免超卖的问题了。
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
总结
超卖这样的线程安全问题,解决方案有哪些?
- 悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
- 乐观锁:不加锁,在更新时判断是否有其它线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
实现一人一单功能
虽然超卖问题解决了,但是存在一个人获取好几单优惠券的问题,所以我们要实现一人一单功能。
这里将一人一单、查询订单、扣减库存封装起来。我们现在的逻辑是先根据userId
和voucherId
查询订单的数量,如果订单大于0,说明已经优惠券已经买了一单了,就不该再买了,所以一人一单。但还是那个问题,这是在线程并发执行的情况下,如果查询到当前订单数为0,会同时购买优惠券,又导致一个人还是买了多单。所以这里乐观锁无法满足我们的需求,必须使用悲观锁。
使用悲观锁时也要注意,我们需要把锁加到用户id上,我们要锁住用户id,只允许一个线程进入,而且封装的操作作为事务,下单、扣减库存、生成订单要么全部成功,要么全部失败。所以锁必须加在事务的外面,如果锁加在事务内部,一旦执行完,锁unlock
,而事务还未提交仍然会发生线程安全问题。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {// 根据优惠券id查询秒杀信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 判断当前时间是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 判断当前时间是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已结束return Result.fail("秒杀已经结束!");}// 查询优惠券是否还有库存if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {// 一人一单Long userId = UserHolder.getUser().getId();// 1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {return Result.fail("库存不足");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 1.添加订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.下单用户idvoucherOrder.setUserId(userId);// 3.优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 返回订单idreturn Result.ok(orderId);}
}
集群下的线程并发安全问题
在集群下的线程又发生了并发安全问题,synchronized
并没有锁住,两个端口同时请求又发生了一人多单问题。
单机下线程执行情况:
集群下线程执行情况:
每个JVM有一个锁监视器,左将线程1加锁,而右在他的JVM中将线程3加锁,这就导致两个端口重复了操作。所以需要加跨JVM的锁。
分布式锁
满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁工作原理
分布式锁的核心是实现多进程互斥,常见的有三种:
基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法
- 获取锁:
- 互斥: 确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
# 添加锁,NX是互斥、EX是设置超时时间
SET lock threadl NX EX 10
- 释放锁:
- 手动释放
- 超时释放: 获取锁时添加一个超时时间
# 释放锁,删除即可
DEL key
实现Redis分布式锁版本1
利用Redis的setnx实现分布式锁工具类
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识long threadId = Thread.currentThread().getId();//尝试获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}
实现类如下:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result seckillVoucher(Long voucherId) {// 根据优惠券id查询秒杀信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 判断当前时间是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 判断当前时间是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已结束return Result.fail("秒杀已经结束!");}// 查询优惠券是否还有库存if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//获取锁boolean isLock = lock.tryLock(5L);//判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {// 释放锁lock.unlock();}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {// 一人一单Long userId = UserHolder.getUser().getId();// 1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {return Result.fail("库存不足");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 1.添加订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.下单用户idvoucherOrder.setUserId(userId);// 3.优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 返回订单idreturn Result.ok(orderId);}
}
Redis分布式锁误删问题
如图,线程一获取了锁,但是发生了阻塞,阻塞过长时间,导致key超时,导致锁被释放。与此同时,线程二获取当前锁,执行业务逻辑,可是此时线程一业务完成并执行unlock释放锁,导致线程三进入,这么一来,又导致了线程的并发执行。
如何改进呢?
我们可以在每次释放锁的时候都比较一下锁的标识,我们需要创建锁对象时就生成某种标识,并且这是唯一的,只需要比较当前锁标识与创建锁时标识是否相同,相同则释放锁,否则不做处理。
灵魂二问:
- 在如下的代码里这个
order + userId
,这不就是现成的标识吗,也确实是唯一的,那我不就可以使用这个当作标识吗?
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
其实这是错的,这么做会发生什么问题?
我们来到SimpleRedisLock
方法的内部,
private String name;
private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;
}private static final String KEY_PREFIX = "lock:";@Override
public boolean tryLock(Long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//尝试获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);}
可以看到这里的"order" + userId
,实际上是这个锁的key,用于保证每个用户只能同时抢一次。可是现在把锁的key当作锁标识,就会导致同一用户的所有线程都可以获取锁释放锁,相当于共用锁,相当于没加锁,因为每个人都有并且相同。
- 一个人不是只能请求一单吗,为什么会有多个线程?
- 用户点击多次 / 重复请求
用户可能点了两次「立即抢购」按钮,或者网络抖动导致浏览器重试。
-
请求 A 和 请求 B 几乎同时到达后端,都会进入到
seckillVoucher()
方法。 -
这两个请求是两个不同的线程,但
userId
相同,所以会共用同一个锁 key:order:123
。 -
** 分布式部署**
服务可能部署在多台机器上(比如 3 个应用实例),
- 用户请求可能被负载均衡分配到不同机器。
- 每台机器都会有一个线程处理这个用户的请求。
- 这些线程虽然在不同 JVM,但操作的 Redis 是同一个,所以 key 必然相同:
order:123
。
关键就是这句话,线程在不同的JVM,但是操作的Redis是同一个。
需求:修改之前的分布式锁实现,满足:
- 在获取锁时存入线程标示(可以用UUID表示)
- 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "—";@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//尝试获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标识是否一致if(threadId.equals(id)){//释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
分布式锁的原子性问题
误删问题解决后仍然存在其他问题,在极端的情况下,线程一获取锁执行业务但发生了阻塞,导致锁过期释放,此时线程二获取了锁执行业务,线程一阻塞完成后释放锁,虽然课程是这么讲的,但我还是很疑惑的。
这里锁的实现逻辑是setnx + 唯一标识 + 过期时间
明明已经给释放锁加了判断条件,必须当前锁标识与之前上锁的标识一样才能释放锁,现在很显然线程二上了锁,线程一又怎么去给他释放呢?
这里可能的解释是,为了引出这个问题,描述的这么一种情况,可能锁标识校验没那么严谨。
如果是这样,发生的问题就是由于当前释放锁不具有原子性,可能在线程二执行查询锁标识和删除锁之间,被线程一抢占进程,导致释放锁。因为释放锁的过程是两个动作的接连发生,而不是一次性执行完。
Lua脚本解决多条命令原子性问题
语法简介:
Java调用Lua脚本改造分布式锁
RedisTemplate调用Lua脚本的API如下:
-- 比较线程的标识与锁的标识是否一致
if(redis.call('GET', KEYS[1]) == ARGS[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1]) -- 成功返回1
end
return 0 -- 失败返回0
重写工具类SimpleRedisLock,具体过程为首先初始化脚本
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "—";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;// 初始化脚本static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(Long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//尝试获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 调用Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}/*@Overridepublic void unlock() {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标识是否一致if(threadId.equals(id)){//释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}*/
}
这里的关键是:
- 检查(
get
)和 删除(del
)在一条 Lua 脚本中执行,Redis 会保证脚本的执行是原子性的。 - 如果不用 Lua,而是客户端先
get
再del
,就有可能在两条命令之间被其他线程插入,导致误删。
Redisson功能介绍
基于setnx实现的分布式锁存在下面的问题:
- 不可重入
同一个线程无法多次获取同一把锁
- 不可重试
获取锁只尝试一次就返回false,没有重试机制
- 超时释放
锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性
如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现
Redisson快速入门
Redisson可以理解为是在Redis基础上实现的分布式工具集,包含了各种分布式锁的实现。
- 首先引入redisson依赖
- 配置
RedissonConfig
配置类
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.31.33:6379").setPassword("123321");//创建client对象return Redisson.create(config);}
}
- 使用
Redisson
分布式锁
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) {// 根据优惠券id查询秒杀信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 判断当前时间是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 判断当前时间是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 已结束return Result.fail("秒杀已经结束!");}// 查询优惠券是否还有库存if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("Lock:order" + userId);//获取锁boolean isLock = lock.tryLock();//判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {// 释放锁lock.unlock();}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {// 一人一单Long userId = UserHolder.getUser().getId();// 1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {return Result.fail("库存不足");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 1.添加订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.下单用户idvoucherOrder.setUserId(userId);// 3.优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 返回订单idreturn Result.ok(orderId);}
}
Redisson可重入锁原理
为确保同一线程可以获取到多个锁,我们会给上锁的时候同时存储一个value值,这个value代表锁的重入次数,又因为同一线程锁的key是一样的,所以只要再获取锁就value++
。当然,代价就是我们无法再使用setnx
了,这个指令与可重入锁完全背道而驰。
现在的实现逻辑是
Redisson的锁重试和WatchDog机制
抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同
1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null
2、判断当前这把锁是否是属于当前线程,如果是,则返回null
所以如果返回是null,则代表着当前线程已经抢锁完毕,或者可重入完毕,但是如果以上两个条件都不满足,则进入到第三个条件,返回的是锁的失效时间。
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {return;
}
接下来会有一个条件分支,因为lock方法有重载方法,一个是带参数,一个是不带参数,如果带参数传入的值是-1,如果传入参数,则leaseTime是他本身,此时leaseTime != -1 则会进去抢锁,抢锁的逻辑就是之前说的那三个逻辑
if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
ttlRemainingFuture.onComplete((ttlRemaining, e);
这句话相当于对以上抢锁进行了监听,也就是说当上面抢锁完毕后,此方法会被调用,具体调用的逻辑就是去后台开启一个线程,进行续约逻辑,也就是看门狗线程
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;
此逻辑就是 续约逻辑,注意看
commandExecutor.getConnectionManager().newTimeout()
方法
Method(new TimerTask() {}, 参数2, 参数3)
指的是:通过 参数2、参数3 去描述什么时候去做参数1的事情,现在的情况是:10s之后去做参数一的事情。
-
锁的失效时间:30s
-
在锁创建 10s 后,
TimerTask
被触发,执行 续约操作:- 将当前这把锁的过期时间续约为 30s。
-
如果续约成功:
- 再次递归调用自身,重新设置一个
TimerTask()
。 - 等待 10s 后再次触发续约。
- 再次递归调用自身,重新设置一个
-
如此循环,完成 不停的续约,保证在业务未完成时锁不会过期。
-
如果 当前线程宕机,那么不会再进行续约。
-
因为此时没有线程去调用
renewExpiration()
方法。 -
当锁的过期时间到达后,锁就会 自然释放。
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
Redisson的MultiLock原理
在集群模式下,一般Redis会有一个主节点,多个从节点,主节点负责所有发向Redis的写操作,从节点负责读操作,所以主从节点会做数据同步,会有一定延时。
那么这时候,Java应用执行set命令获取了主节点的锁,但是此时主节点突然宕机,而且此时同步还未完成,客户端连接断开,集群的哨兵会监视并从剩余两个节点选择一个作为主节点。问题也就出现了,之前锁失效,对于新的主节点,任意线程都可以访问,造成了并发问题。
Redis是怎么解决的?
简单粗暴的取消主从关系,设置多个同等级节点,并且必须依次向每个节点获取锁成功才算成功获取锁,此时就不会有主从一致性问题了。
进一步优化,可以在节点之后添加从节点并作主从同步,即使某主节点故障,仍然得依次获取其他全部锁。这种实现方法称为MultiLock
- 不可重入Redis分布式锁:
- 原理: 利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
- 缺陷: 不可重入、无法重试、锁超时失效
- 可重入的Redis分布式锁:
- 原理: 利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷: redis宕机引起锁失效问题
- Redisson的multiLock:
原理: 多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷: 运维成本高、实现复杂
秒杀优化-异步秒杀
需求:
① 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
② 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足return 1
end
-- 3.2 判断用户是否下单
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明是重复下单,返回2return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
③ 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
④ 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
未完…
如果这篇文章对你有帮助,请点赞、评论、收藏,创作不易,你的支持是我创作的动力。