当前位置: 首页 > news >正文

Redis——基础篇

Redis介绍

  • Redis是一种NoSQL数据库,也称为缓存中间件

  • Redis和MySQL

    1. Redis保存半非结构化数据,MySQL保存结构化数据
    2. Redis运行在内存上,MySQL运行在硬盘上
    3. Redis语法不通用,MySQL遵循w3c的SQL语句
    4. Redis没有事务回滚功能,MySQL支持事务
  • Redis和ES

    1. Redis存储半结构化数据,ES存储非结构化数据,查询功能上ES更强大
    2. Redis完全运行在内存中,ES使用内存和磁盘的混合结构,性能上Redis更好,且Redis延迟更低
    3. Redis只能有限的分布式部署,ES天然具有分布式特性,数据量上ES更强大
    4. 高频读写、实时性要求高选择Redis;若需处理大规模数据、复杂搜索分析或数据可视化,选择Elasticsearch
  • Redis基本特征

    1. Redis运行在内存上,读写性能极高
    2. Redis默认单线程运行,并发访问时会严格顺序执行
    3. Redis虽然有持久化策略,但依然有数据丢失风险
    4. Redis支持多节点部署
  • 应用场景

    1. 查询缓存
    2. 分布式锁
    3. 分布式会话
    4. 消息队列
    5. 地理空间
    6. 位图统计

数据类型

Commands | Docs (redis.io)

通用规则

  • Redis 是一个基于内存的键值对(Key-Value)数据库,其核心数据结构围绕键值对展开

    1. 一般情况下,约定key的类型为字符串,下述的数据类型是描述value的类型
    2. key的值可以用冒号隔开,默认形成层级结构方便管理(例如项目:业务:类型:数据id
    set "com:wyh:user:id" 1
    
  • redis的命令对大小写不敏感

  • redis不严格要求引号声明字符串,redis会自动转换,但建议统一使用双引号

    set age 25
    # 隐式转换,等效于
    set "age" "25"
    
  • redis的结束符是换行符(回车),每一行就是一条命令,如果一次执行多条命令可以使用管道

    #\n是换行符,redis执行到此就结束当前命令的执行
    echo -e "SET key1 value1\nGET key1\nINCR counter" | nc localhost 6379
    
  • 通用命令

    1. select 数字:表示选取几号数据库
    2. flushall:清除当前数据库
    3. del key名[或者key列表]:删除/批量key
    4. exists key名[或者key列表]:查询是否存在key,返回存在的key的数量
    5. type key名:查看value数据类型
    6. expire key名 秒数:给key设置有效期,到期后key会被自动删除
    7. ttl key名:查看key剩余有效秒数
    8. keys 匹配表达式:查看匹配的key,可以模糊匹配(如keys *yh),不能跨库匹配**

String

  • 字符串类型是redis的基本类型,Redis的所有数据类型都可以归结为字符串的变种

  • redis没有数值类型,声明数值以字符串形式声明,做数值运算时redis会隐式转换

  • 应用场景

    1. 缓存热点数据(如 HTML 片段、用户信息)
    2. 分布式锁(通过 SET key value NX EX 实现互斥锁)
    3. 计数器(如页面访问量、点赞数)
  • 常见命令

    1. set 键 值:添加String类型的键值对或者修改String类型的键值对

    2. get 键:根据key获取String类型的value

    3. mset 键1 值1 键2 值2...:批量添加多个String类型的键值对,如果该key已经存在则修改该key的value

    4. mget 键1 键2...:根据多个key获取多个String类型的value

    5. incr 整数路径:让一个整数形式的字符串自增+1,返回自增后的value

    6. incrby 整数路径 步长:让一个整数形式的字符串按指定步长自增,返回自增后的value

    7. incrbyfloat 浮点数路径 步长:让一个浮点数形式的字符串按照指定步长自增(incr可以为负数)

    8. setnx 键 值:添加一个String类型的键值对,前提是这个key不存在,否则不执行(返回0)

    9. setex 键 值 秒数:添加一个String类型的键值对,并指定有效期,如果此key存在则修改并指定有效期

#新增键值对
127.0.0.1:1145> set name wyh
OK
#获取value
127.0.0.1:1145> get name
"wyh"
#修改已经存在的键值对
127.0.0.1:1145> set name wyh1
OK
127.0.0.1:1145> get age 1
"24"
#整数形式字符串自增+1
127.0.0.1:1145> incr age
(integer) 25
#key为status不存在,添加成功
127.0.0.1:1145> setnx status 8
(integer) 1
#key为status已经存在,添加不执行,返回0
127.0.0.1:1145> setnx status 6
(integer) 0
#设置name的value为wyh3,并指定有效期为3秒
127.0.0.1:1145> setex name 3 wyh3
OK

List

  • redis中的List一个双向链表,支持正向索引,也支持反向索引

  • 应用场景

    1. 消息队列(生产者-消费者模型)
    2. 最新消息列表(如用户最近的 100 条动态)
    3. 栈或队列(通过 LPUSH/RPOPRPUSH/LPOP 组合实现)
  • 常见命令

    1. lpush 键 元素 元素2...:向列表左侧加入元素

    2. lpop 键 count:删除列表中键对应的左侧count个元素,没有就返回nil

    3. rpush 键 元素1 元素2...:向列表右侧加入元素

    4. rpop 键 count:删除列表右侧开始count个元素,没有就返回nil

    5. lrange 键 开始游标 结束游标:返回列表中指定的一段数据,游标从0开始

    6. blpop 键 秒数 brpop 键 秒数:删除列表左侧/右侧第一个元素,没有就等待指定时长,还没有就返回nil

    7. lindex <key><index>:按照索引下标获得元素(从左到右)

    8. linsert <key> before <value><newvalue>:在value的后面插入newvalue插入值

    9. llen <key>:获得列表长度

    10. LSET key index element:修改index位置的元素值

    11. BLPOP key [key ...] timeout:从左侧阻塞地弹出元素,若列表为空则阻塞,直到超时或有数据

    12. BRPOP key [key ...] timeout:从右侧阻塞地弹出元素(类似 BLPOP,但方向相反)

#左插多个元素
127.0.0.1:1145> lpush student wyh1 wyh2 wyh3 wyh4
(integer) 4
#lrange 键 0 -1:查看列表所有值
127.0.0.1:1145> lrange student 0 -1
1) "wyh4"
2) "wyh3"
3) "wyh2"
4) "wyh1"
#右插多个元素
127.0.0.1:1145> rpush student1 wyh1 wyh2 wyh3 wyh4
(integer) 4
127.0.0.1:1145> lrange student1 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"
4) "wyh4"
#删除列表左侧开始两个元素,返回删除的元素
127.0.0.1:1145> lpop student 2
1) "wyh4"
2) "wyh3"
127.0.0.1:1145> lrange student 0 -1
1) "wyh2"
2) "wyh1"

Hash

  • redis中的Hash是一个无序Map字典

  • 应用场景

    1. 存储复杂对象(如用户信息、商品信息)
    2. 聚合统计(如记录商品的浏览次数、收藏量)
  • Hash常用命令

    1. hset 键 字段 字段值:添加或者修改一个键值对
    2. hget 键 字段:查询hash的指定字段的值
    3. hmset 键 字段1 字段值1 字段2 字段值2...:添加多个键值对
    4. hmget 键 字段1 字段2...:查询多个指定hash字段的字段值
    5. hgetall 键:获取一个键中所有的hash的字段:字段值
    6. hkeys 键:获取一个键中所有的hash的字段
    7. hvals 键:获取一个键中所有的hash的字段值
    8. hincrby 键 字段 步长:让hash类型中某一个字段值根据步长自增,注意字段值只能是整数
    9. hsetnx 键 字段 字段值:当hash类型不存在时才能添加hash类型,如果已经存在则返回0
    10. hexists <key1> <field>:查看哈希表 key 中,给定域 field 是否存在
#添加键
127.0.0.1:1145> hset user name wyh1
(integer) 1
#查询键为user,值的字段为name的字段值
127.0.0.1:1145> hget user name
"wyh1"
#添加键为user,值为{name:wyh,age:24,gender:male}
127.0.0.1:1145> hmset user name wyh age 24 gender male
OK
#查询键为user,值中字段为name和age的字段值
127.0.0.1:1145> hmget study:redis:student:1 name age
1) "wyh"
2) "24"
#user已存在,不执行
127.0.0.1:1145> hsetnx user name wyh
(integer) 0

Set

  • redis的set是元素不重复的集合

  • 应用场景

    • 标签系统(记录用户的兴趣标签)
    • 去重统计(如文章的唯一阅读用户)
    • 社交网络关系(如关注列表、粉丝列表)
  • 常见命令

    1. sadd 键 元素1 元素2...:向Set中添加一个或多个元素

    2. srem 键 元素2 元素2...:移除Set中指定的一个或多个元素

    3. scard 键:返回Set中元素的个数

    4. sismember 键 元素:判断元素是否在集合中

    5. smembers 键:获取集合中所有元素

    6. sinter 键1 键2...:求键1的set和键2的set交集

    7. sdiff 键1 键2...:求键1的set和键2的set差集

    8. sunion 键1 键2...:求键1的set和键2的set并集

#一次添加多个元素
127.0.0.1:6379> sadd name wyh1 wyh2 wyh3
(integer) 3
#集合不允许重复,插入已经存在的元素则不执行返回0
127.0.0.1:6379> sadd name wyh1
(integer) 0
#移除指定元素
127.0.0.1:6379> srem name wyh3
(integer) 1
#返回集合中元素数量
127.0.0.1:6379> scard name
(integer) 2
#判断指定元素是否在集合中,不在则返回0
127.0.0.1:6379> sismember name wyh1
(integer) 1
127.0.0.1:6379> sismember name wyh6
(integer) 0

Zset

  • Zset是可排序的set

    1. Zset底层通过跳表实现排序,元素带有score属性,score越大的元素在Zset的游标越大
    2. 底层通过哈希表实现查找
  • 应用场景

    1. 排行榜(实时更新游戏玩家分数排名)
    2. 带权重的队列(按优先级处理任务)
    3. 时间轴(按时间戳排序的消息列表)
  • 常见命令

    1. zadd 键 score1 元素1 score2 元素2...:添加一个或多个元素到Zset中,如果已经存在则修改score值

    2. zrem 键 元素1 元素2...:删除一个或多个指定元素

    3. zrank 键 元素:查看指定元素在集合中的排序

    4. zcard 键:获取Zset中元素的个数

    5. zcount 键 min max:统计score在min~max之间(包含min和max)的元素个数

    6. zrangebyscore 键 min max:获取score在min~max之间的元素

    7. zrange 键 min max:获取游标在min~max之间的元素

    8. zincrby 键 元素 步长:让集合指定元素的score根据步长自增

    9. zdiff,zinter,zunion:求差集、交集、并集

#添加多个元素到有序集合中
127.0.0.1:6379> zadd user 10 wyh1 20 wyh2 30 wyh3 40 wyh4
(integer) 4
127.0.0.1:6379> zrange user 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"
4) "wyh4"#删除有序集合中指定元素
127.0.0.1:6379> zrem user wyh4
(integer) 1
127.0.0.1:6379> zrange user 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"#查看有序集合中元素的个数
127.0.0.1:6379> zcard user
(integer) 3#统计score值在指定范围内的元素个数
127.0.0.1:6379> zcount user 20 30
(integer) 2#获取score值在指定范围内的元素
127.0.0.1:6379> zrangebyscore user 20 30
1) "wyh2"
2) "wyh3"#让wyh1元素的score值自增2
127.0.0.1:6379> zincrby user 2 wyh1
"12"

Java客户端

  • 常见客户端

    1. jedis:方法名称就是redis的命令名,使用简单
    2. lettuce:基于netty实现,支持redis的集群,支持同步、异步、响应式编程等高级功能
    3. redisson:基于分布式部署redis客户端
  • 一般业务中推荐使用lettuce,分布式部署(如 集群模式、哨兵模式、主从架构)推荐使用redisson

Jedis

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
public class JedisConnectionPool { //Jedis线程不安全,必须使用连接池private static final JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();//最大连接数jedisPoolConfig.setMaxTotal(10);//最大空闲连接jedisPoolConfig.setMaxIdle(10);//最小空闲连接jedisPoolConfig.setMinIdle(10);//设置最长等待时间jedisPoolConfig.setMaxWaitMillis(10);//选择0号库,连接超时时长2000msjedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 2000, "123456", 0);}//从连接池中获jedis连接public static Jedis getJedis() throws Exception{return jedisPool.getResource();}//回收jedis连接public static void close(Jedis jedis){if (jedis != null){//Jedis连接池不会自动回收连接,需要手动归还jedis.close();}}
}
public class Main {public static void main(String[] args) {//获取连接池中的jedis连接对象Jedis jedis = null;try {jedis = JedisConnectionPool.getJedis();} catch (Exception e) {System.out.println("连接失败");}//开始进行数据操作String result = null;if (jedis != null) {result = jedis.set("name","wyh");System.out.println("插入/修改键值对name:wyh,返回结果是:\n"+result);String name = jedis.get("name");System.out.println("查询键为name的值为:\n"+name);}//归还连接JedisConnectionPool.close(jedis);}
}

SpringDataRedis

  • Spring Data Redis是Spring官方为redis主流客户端的集成框架,同时提供了更丰富的功能
  1. 兼容jedis、lettuce(默认)等众多客户端
  2. 自带序列化和反序列化
  3. 支持redis发布订阅等高级功能
  4. 支持redis集群
<!--nosql-spring data redis-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--连接池门面,具体使用哪个连接池取决于使用哪种客户端,可以自动回收连接-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
<!--spring data redis默认使用lettuce,如果使用jedis需要在配置类中手动指定jedis(见下文)-->
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
  • 单机模式
# 单机模式快速配置
spring:redis:host: 127.0.0.1port: 6379password: yourpassword
@Configuration
public class RedisConfig {// 使用 Jedis 连接池(需要额外加jedis依赖)@Beanpublic RedisConnectionFactory jedisConnectionFactory() {// 1. 配置 Redis 服务器信息RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();redisConfig.setHostName("localhost");redisConfig.setPort(6379);redisConfig.setPassword("yourpassword"); // 如果没有密码,可以不设置redisConfig.setDatabase(0); // 默认数据库// 2. 配置 Jedis 连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8);          // 最大连接数poolConfig.setMaxIdle(8);           // 最大空闲连接数poolConfig.setMinIdle(0);           // 最小空闲连接数poolConfig.setMaxWaitMillis(1000);  // 获取连接超时时间(1秒)// 3. 配置 Jedis 客户端选项JedisClientConfiguration jedisClientConfig = JedisClientConfiguration.builder().connectTimeout(Duration.ofSeconds(10))  // 连接超时时间.readTimeout(Duration.ofSeconds(5))      // 读取超时时间.usePooling()                            // 启用连接池.poolConfig(poolConfig)                  // 设置连接池配置.build();// 4. 创建 JedisConnectionFactoryreturn new JedisConnectionFactory(redisConfig, jedisClientConfig);}// 使用 Lettuce 连接池(默认)@Beanpublic RedisConnectionFactory lettuceConnectionFactory() {// 1. 配置 Redis 服务器信息RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();redisConfig.setHostName("localhost");redisConfig.setPort(6379);redisConfig.setPassword("yourpassword");redisConfig.setDatabase(0);// 2. 配置 Lettuce 连接池GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(8);          // 最大连接数poolConfig.setMaxIdle(8);           // 最大空闲连接数poolConfig.setMinIdle(0);           // 最小空闲连接数poolConfig.setMaxWaitMillis(-1);    // 获取连接超时时间(-1 表示无限等待)// 3. 配置 Lettuce 客户端选项(可选)SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(10))  // 连接超时时间.build();ClientOptions clientOptions = ClientOptions.builder().socketOptions(socketOptions).timeoutOptions(TimeoutOptions.enabled()) // 启用超时选项.build();// 4. 构建 Lettuce 客户端配置LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig)                  // 设置连接池.clientOptions(clientOptions)            // 设置客户端选项.commandTimeout(Duration.ofSeconds(5))   // 命令超时时间.shutdownTimeout(Duration.ofMillis(100)) // 关闭超时时间.build();// 5. 创建 LettuceConnectionFactoryreturn new LettuceConnectionFactory(redisConfig, clientConfig);}/** * SpringBoot自动配置RedisTemplate/StringRedisTemplate,也可以做自定义增强* 如果使用多连接池,需要指定 RedisTemplate 使用哪个 ConnectionFactory* 1.修改客户端类型* 2.指定序列化策略* 3.配置redis高级功能(如发布订阅)*/@Beanpublic StringRedisTemplate stringRedisTemplate() {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(lettuceConnectionFactory()); // 明确指定 Lettuce//template.setConnectionFactory(jedisConnectionFactory()); // 也可以指定 Jedisreturn template;}@Beanpublic RedisTemplate<String, Object> redisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(lettuceConnectionFactory()); // 明确指定 Lettuce//template.setConnectionFactory(jedisConnectionFactory()); // 也可以指定 Jedis// 可选:配置序列化器template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}
}
  • 哨兵模式配置
@Configuration
public class RedisClusterConfig {/*** 配置哨兵模式(支持哨兵 + 读写分离)* 哨兵监听主节点,从节点由主节点发现,因此只配置哨兵即可*/@Beanpublic RedisConnectionFactory redisConnectionFactory() {// 配置 Redis Sentinel(哨兵)RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration();sentinelConfig.setMaster("mymaster"); // 设置主节点名称sentinelConfig.setPassword("123456"); // 设置主节点密码sentinelConfig.sentinel("192.168.1.100", "26379"); //哨兵1sentinelConfig.sentinel("192.168.1.101", "26379"); //哨兵2// 配置 Lettuce 连接池GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);// 构建 Lettuce 客户端配置LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig)                  // 设置连接池.readFrom(ReadFrom.REPLICA_PREFERRED) // 优先从从节点读取.commandTimeout(Duration.ofSeconds(5))   // 命令超时时间.shutdownTimeout(Duration.ofMillis(100)) // 关闭超时时间.build()return new LettuceConnectionFactory(sentinelConfig, clientConfig);}/*** 配置 StringRedisTemplate*/@Beanpublic StringRedisTemplate stringRedisTemplate() {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory());template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}
}
  • 分片模式配置
@Configuration
public class RedisClusterSentinelConfig {/*** 配置集群模式(支持哨兵 + 读写分离)* 集群模式无需配置哨兵,redis会自动在主从节点间故障转移,从节点由主节点发现,只配置主节点即可*/@Beanpublic RedisConnectionFactory redisConnectionFactory() {// 集群配置List<String> masterNodes = Arrays.asList("192.168.1.100:6379", "192.168.1.101:6379", "192.168.1.102:6379");RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(masterNodes);clusterConfig.setPassword("123456");// 客户端配置LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离.build();return new LettuceConnectionFactory(clusterConfig, clientConfig);}@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}
}

RedisTemplate

  • SpringDataRedis提供了RedisTemplate工具类,其中封装并统一了对redis的各种操作

    API类说明
    redisTemplate.opsForValue()操作string类型
    redisTemplate.opsForHash()操作hash类型
    redisTemplate.opsForList()操作List类型
    redisTemplate.opsForSet()操作set类型
    redisTemplate.opsForZSet()操作Zset类型
    redisTemplate通用操作redis命令
public class SpringDataRedisTest {@Resourceprivate RedisTemplate<String,Object> redisTemplate;@Testvoid testRedisString(){try {//写入一条数据,redisTemplate入参是对象会自动序列化为字节数组redisTemplate.opsForValue().set("user",new User());//获取一条数据User user =(User) redisTemplate.opsForValue().get("user");} catch (Exception e) {System.out.println("连接失败");throw new RuntimeException(e);}}
}

StringRedisTemplate

  • StringRedisTemplateRedisTemplate

    1. StringRedisTemplate默认使用JDK序列化,Redis保存的是不可读的字节数组,手动修改序列化器又消耗了空间性能
    2. StringRedisTemplate使用StringRedisSerializer序列化,Redis保存的是可读文本,可以存入Json字符串
  • 通用API(操作key)

    1. stringRedisTemplate.delete(String key):删除键
    2. stringRedisTemplate.delete(Collection<String> keys):批量删除键
    3. stringRedisTemplate.hasKey(String key):键是否存在
    4. stringRedisTemplate.expire(String key, long timeout, TimeUnit unit):设置键的过期时间
    5. stringRedisTemplate.getExpire(String key, TimeUnit unit):获取键的剩余生存时间(TTL)
    6. stringRedisTemplate.rename(String oldKey, String newKey):重命名键
    7. stringRedisTemplate.renameIfAbsent(String oldKey, String newKey):键不存在时才重命名
  • String-API

    1. StringRedisTemplate.opsForValue().set(K key, V value, long timeout, TimeUnit unit):新增/修改值
    2. StringRedisTemplate.opsForValue().get(K key):获取key下的值
    3. StringRedisTemplate.opsForValue().increment(K key, D delta):key下的值自增delta
    4. StringRedisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit):setnx,值不存在时才新增
    5. StringRedisTemplate.opsForValue().multiSet(Map<String, String> map):批量set多个值
    6. StringRedisTemplate.opsForValue().multiGet(Collection keys):获取多个值,以List类型接收
  • List-API

    1. StringRedisTemplate.opsForList().leftPush/rightPush(K key, V value):左插/右插
    2. StringRedisTemplate.opsForList().leftPushIfPresent/rightPushIfPresent(K key, V value):如果存在则添加元素
    3. StringRedisTemplate.opsForList().leftPop/rightPop(K key):移除左/右边元素
    4. StringRedisTemplate.opsForList().leftPop/rightPop(K key, long timeout, TimeUnit unit):移除失败则返回null
    5. StringRedisTemplate.opsForList().range(K key, long start, long end):获取指定区间的值,以List接收
    6. StringRedisTemplate.opsForList().size(K key):获取key下的List长度
    7. StringRedisTemplate.opsForList().index(K key, long index):获取游标位置的元素
    8. StringRedisTemplate.opsForLis().set(K key, long index, V value):指定位置插入元素
  • Hash-API

    1. StringRedisTemplate.opsForHash().put(H var1, HK var2, HV var3):插入/修改hash
    2. StringRedisTemplate.opsForHash().putIfAbsent(H key, HK var2, HV var3):如果存在hash则插入
    3. StringRedisTemplate.opsForHash().putAll(H key, Map<? extends HK, ? extends HV> map):批量插入/修改
    4. StringRedisTemplate.opsForHash().get(H var1, Object var2):获取key下键对应的值,返回Object,需要再手动强转
    5. StringRedisTemplate.opsForHash().multiGet(H key, Collection vals):获取key下的多个键vals对应的值,以List接收
    6. StringRedisTemplate.opsForHash().entries(H key):获取key下Hash的所有键值对,以Map接收
    7. StringRedisTemplate.opsForHash().keys(H key):获取key下Hash的所有键,以Set接收
    8. StringRedisTemplate.opsForHash().values(H key):获取key下Hash的所有值,以List接收
    9. StringRedisTemplate.opsForHash().delete(H key, Object var1 …): 根据key下的Hash中的键var1…,删除Hash的键值对
    10. StringRedisTemplate.opsForHash().increment(H key, HK var2, long long1):key下的键var2对应的值自增long1
    11. StringRedisTemplate.opsForHash().size(K key):获取key下的Hash长度
  • Set-API

    1. StringRedisTemplate.opsForSet().add(K var1, V... var2):添加一个或多个元素
    2. StringRedisTemplate.opsForSet().members(K key):获取key下的Set所有元素
    3. StringRedisTemplate.opsForSet().size(K key):获取key下的Set长度
    4. StringRedisTemplate.opsForSet().remove(K var1, Object... var2):移除Set成员元素
    5. StringRedisTemplate.opsForSet().isMember(K var1, Object var2):判断是否包含元素
    6. StringRedisTemplate.opsForSet().intersect(K var1, K var2):计算多个集合的交集,并返回结果集合
    7. StringRedisTemplate.opsForSet().union(K var1, K var2):计算多个集合的并集,并返回结果集合
    8. StringRedisTemplate.opsForSet().difference(K var1, K var2):计算两个集合的差集,并返回结果集合
  • Zset-API

    1. StringRedisTemplate.opsForZSet().add(K var1, V var2, double var3):添加一个成员,同时指定该成员的分数
    2. StringRedisTemplate.opsForZSet().range(K var1, long var2, long var4):按索引范围查询,返回由低到高排序的集合
    3. StringRedisTemplate.opsForZSet().reverseRange(K var1, long var2, long var4):按索引范围查询,由低到高
    4. stringRedisTemplate.opsForZSet().rangeByScore("key", minScore, maxScore):按score范围查询,包含边界
    5. stringRedisTemplate.opsForZSet().rangeByScore("key", minScore, maxScore, 0, -1):按score范围查询,不含边界
    6. StringRedisTemplate.opsForZSet().zCard(K var1):获取有序集合中的成员数
    7. StringRedisTemplate.opsForZSet().remove(K var1, Object... var2):移除ZSet成员
    8. StringRedisTemplate.opsForZSet().incrementScore(K var1, V var2, double var3):指定成员的分数增加指定步长
    9. StringRedisTemplate.opsForZSet().count(K var1, double var2, double var4):返回score范围内的成员数量
    10. StringRedisTemplate.opsForZSet().rank(K var1, Object var2):获取指定成员在有序集合中的排名,以long接收
@SpringBootTest
public class TestRedis {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Test// 测试redis连接public void testConnect() {String result = stringRedisTemplate.execute(RedisConnection::ping);Assert.assertEquals(result, "PONG"); //ping-pong检测}@Testpublic void test(){try {ObjectMapper objectMapper = new ObjectMapper();//写入一条数据String userDTO = objectMapper.writeValueAsString(new UserDTO());stringRedisTemplate.opsForValue().set("user",userDTO);//获取一条数据String userStr = stringRedisTemplate.opsForValue().get("user");UserDTO user = objectMapper.readValue(userStr, UserDTO.class);} catch (Exception e) {System.out.println("连接失败");throw new RuntimeException(e);}}
}

Redisson

  • redisson包含了Spring Data Redis依赖,但排除了jedis和lettuce客户端,如果要使用还是要加入Spring Data Redis依赖
  • redisson内置了连接池,无需额外配置数据源
<!-- Redisson 官方 Starter(推荐) -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.4</version> <!-- 使用最新版本 -->
</dependency>
@Configuration
public class RedissonConfig {@Bean("singleRedissonClient") //单机模式public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456").setDatabase(0);return Redisson.create(config);}@Bean("SentinelRedissonClient") //哨兵模式,只需要配置哨兵public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useSentinelServers().addSentinelAddress("redis://192.168.1.1:26379", "redis://192.168.1.2:26379") //两个哨兵.setMasterName("master") // master节点名称,节点地址在哨兵中已配置好.setPassword("yourpassword"); // master节点密码.setSentinelPassword("sentinel-password") // Sentinel 节点的密码(可选).setDatabase(0);return Redisson.create(config);}@Bean("ClusterRedissonClient") //集群模式,只需要配置主节点public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useClusterServers() .addNodeAddress("redis://192.168.1.1:6379", "redis://192.168.1.2:6379") //两个主节点.setPassword("yourpassword") //Redis 集群模式下,所有主节点和从节点的密码必须完全一致.setDatabase(0);return Redisson.create(config);}
}

Redis应用

分布式ID

  • 目的:分布式环境下,防止ID重复

  • 原理:推荐雪花算法思想,可以将本地序列号改为redis自增生成序列号

    1. 第一个部分:只有一个符号位且永远是0,表示id永远是正数,如果使用无符号 long,可以忽略这一位
    2. 第二部分:31位,表示当前时间的时间戳
    3. 第三部分:32位,使用redis的incrby命令递增生成序列号
    4. 其他:如果业务有相关需求,可以再将相关数据加入其中(例如可以再拼接机器信息)
  • 替代方案

    1. UUID:可能重复,但概率极低
    2. 雪花算法(推荐):时间严格递增、工作节点 ID 唯一,因此几乎不会重复,早期存在时钟回拨的问题,但已修复
    3. Leaf(美团开源方案):几乎不会重复
  • 举例:分布式ID生成器

@Component
public class RedisIdUtil {// 时间戳位数private static final int TIMESTAMP_BITS = 31;// 序列号位数private static final int COUNT_BITS = 32;@Autowiredprivate StringRedisTemplate stringRedisTemplate;// 入参是业务前缀,用于区分不同的业务,key格式为 service:日期:id(防止多个业务共用一个ID生成器)public Long getId(String service){LocalDateTime now = LocalDateTime.now();// 1.生成时间戳long timestamp = now.toInstant(ZoneOffset.UTC).toEpochMilli();// 2.生成序列号,不同的业务有不同的id,创建一个键专门存储订单数量String day = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); //精确到天,每天更新id,如果键不存在,则redis会自动创建这个key并初始值取0long count = stringRedisTemplate.opsForValue().increment(servicePrefix + ":" + day +":id"); // 3.拼接return timestamp << COUNT_BITS | count;}
}

分布式会话

  • 目的:防止session失效

  • 原理

    1. 用户首次登录时,后台生成唯一的sessionID作为key,用户信息作为value保存在redis中,返回sessionID
    2. 用户再发出请求时authorization携带sessionID,后台查询redis中key为sessionID下的用户信息,如果无效就返回重新登录,如果验证用户信息有效,还需要根据业务场景延长有效期
    3. 验证通过后,可以将用户信息保存在ThreadLocal中,便于后续复用用户信息
  • 替代方案

    1. JWT
    2. Token(OAuth2)
  • 举例:分布式会话中,实现同一用户指定时间内粘性登录

    1. 要求:有的请求需要验证登录,用的不需要;但如果已经登录,不需要验证登录的请求也会重置登录有效期
    2. 为更好维护,可以使用两个拦截器:第一个拦截所有请求重置有效期,第二个拦截需要验证身份的请求验证登录
//临时保存token的ThreadLocal类
public class ThreadLocalDto {public static final ThreadLocal<UserDTO> threadLocal = new ThreadLocal<UserDTO>();}
//重置有效期拦截器,如果已经登录就重置有效期再放行,其他情况直接放行
@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler) throws Exception {//1.获取tokenString token = req.getHeader("authorization");//2.如果不存在说明没有登录过,给登录拦截器处理if (null == token || token.isEmpty()){return true;}//3.token存在,说明用户登录过,判断用户信息是否过期String userDTOStr = stringRedisTemplate.opsForValue().get("session:"+token);//4.如果过期就放行,让登录验证处理if (userDTOStr == null || userDTOStr.isEmpty()){return true;}//5.如果没有过期,就保存用户信息到ThreadLocal中,并重置有效期60分钟UserDTO userDTO = JSON.parseObject(userStr, UserDTO.class)ThreadLocalDto.threadLocal.set(userDTO);stringRedisTemplate.expire("login:token:"+token, 60, TimeUnit.MINUTES);return true;}}//需要登录的请求再走登录验证拦截器
@Component
public class LoginInterceptor implements HandlerInterceptor {//请求执行前置拦截,实现校验登录功能@Overridepublic boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler) throws Exception {//1.判断是否登录,即判断ThreadLocal是否有值if (ThreadLocalDto.threadLocal.get() == null){//2.如果没有值,说明用户没有登录或者登录过期,拦截让用户重新登录res.setStatus(401);return false;}//3.如果有值,说明用户已经登录了,于是放行return true;}//请求执行完毕后置处理,移除用户信息,避免内存泄露@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {ThreadLocalDto.threadLocal.remove();}
}
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {//RefreshTokenInterceptor先执行,拦截所有请求,如果已登录检查有效期registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);//LoginInterceptor后执行,排除不用登录验证的请求registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code", //验证码请求不用拦截"/user/login", //登录请求不用拦截).order(1);} 
}

验证码登录

  • 目的:验证码验证身份

  • 原理

    1. 验证码发送后,将验证码存在redis中设置有效期,有效期内验证成功即可
    2. 使用redis记录用户发送验证码次数,如果次数过多就拒绝请求,前端也可以先实现请求间隔限制,做第一层防护
  • 举例:实现验证码登录功能

    1. 验证码发送后指定时间内有效
    2. 验证码不能频繁发送
    3. 用户数量一般不会达到海量级别,Session登录的token直接使用UUID即可
@Service
public class UserService implements IUserService{@Autowiredprivate UserMapper userMapper;@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** 生成指定位数的随机数字验证码*/private String generateCode(int length) {StringBuilder code = new StringBuilder();Random random = new Random();for (int i = 0; i < length; i++) {code.append(random.nextInt(10)); // 0-9的随机数}return code.toString();}/*** 发送验证码功能*/@Overridepublic Result sendCode(String phone) {// 1.校验手机号,使用正则表达式来匹配if (RegexUtils.isPhoneInvalid(phone)){return Result.fail("无效的手机格式");}// 2.检查是否请求次数过多,设置key统一为login:code:count:phoneString count = stringRedisTemplate.opsForValue().get("login:code:count:" + phone );if (null != count && Integer.parseInt(count) >= 3) {return Result.fail("超过今日的发送次数");}// 3.开始验证码流程,设置请求次数计数器if (null == count){// 验证次数每天刷新,每天限制三条stringRedisTemplate.opsForValue().set("login:code:count" + phone, "1", 24, TimeUnit.HOURS); }else {stringRedisTemplate.opsForValue().increment("login:code:count" + phone, 1);}// 4.生成并发送一个验证码,保存验证码到redis,有效期五分钟String code = generateCode(6);stringRedisTemplate.opsForValue().set("login:code:" +phone,code, 2, TimeUnit.MINUTES);// 5.发送验证码// TODOreturn Result.ok();}/*** 登录功能*/@Overridepublic Result login(UserDTO userDTO) {String phone = userDTO.getPhone();// 1.校验手机号if (RegexUtils.isPhoneInvalid(phone)){return Result.fail("无效的手机格式");}// 2.从redis取出校验验证码验证String code = stringRedisTemplate.opsForValue().get("login:code:" + phone);if (null == code || !userDTO.getCode().equals(code)){return Result.fail("验证失败或验证码已过期");}UserPO userPO = userMapper.getUserByPhone(phone);// 3.如果用户不存在,则注册用户if (null == userPO){userPO = BeanUtil.copyProperties(userDTO, userPO.class);userMapper.insert(userPO);}// 4.生成分布式会话token保存在redis中String token = UUID.randomUUID().toString(true);String jsonUserDto = JSON.toJSONString(userDTO);stringRedisTemplate.opsForValue().set("login:token:"+token, jsonUserDto, 60L, TimeUnit.MINUTES);// 7.返回给用户token,下次请求带此token,实现粘性登录return Result.ok(token);}
}

分布式锁

  • 目的:分布式环境下,需要支持多节点共享锁资源

setnx分布式锁

  • 原理

    1. 使用redis模拟锁的生成,线程访问资源前先setnx实现持锁过程
    2. 为了防止死锁,分布式锁可以设置有效期,但这样要注明线程标识,防止锁自动失效导致其他线程误删
  • 替代方案

    1. 复杂场景可以使用ZooKeeper或者Redisson实现分布式锁
    2. 可以使用原子性操作,例如插入脚本、使用redisson分布式集合、无锁算法等
    3. 简单场景下可以直接使用数据库的事务功能保证原子性
  • 举例:使用分布式锁解决多端登录问题

    1. 业务要求,用户可以多端同时登录,但指定业务同一时间只能在一台设备上操作(例如抢购、支付等功能)
    2. 注意防止锁误删:请求A执行时间过长导致锁自动释放,请求B持锁时,请求A完成导致释放请求B的锁
@RestController
@RequestMapping("/voucher-order")
public class OrderController {@Autowiredprivate IOrderService orderService;@Autowiredprivate StringRedisTemplate stringRedisTemplate;//抢购功能同一时间只能在一态设备上操作@PostMapping("order/{id}")public Result seckillOrder(@PathVariable("id") Long goodId, HttpServletRequest request) {// 获取用户信息,如果未登录会提前被拦截Long userId = ThreadLocalDto.threadLocal.get().getId();// 本线程标识String threadId = UUID.randomUUID().toString();// 拿分布式锁,value为线程标识,防止锁自动释放导致锁误删Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent("lock:order:" + userId, threadId, 5, TimeUnit.MINUTES);if (!ifAbsent) { // 未拿到锁,说明其他设备正在操作本业务return Result.fail("其他设备正在处理,请重试");}try{ // 拿到锁,开始执行业务return orderService.panicBuy(goodId);}finally{// 释放锁前先检查是否是自己的锁,这两步也可以使用LUA脚本严格保证原子性String redisThreadId = stringRedisTemplate.opsForValue().get("lock:order:" + userId);if(redisThreadId.equals(threadId)){stringRedisTemplate.delete("lock:order:"+userId);}}}
}

LUA脚本

  • 目的:setnx分布式锁无法满足一些业务场景时,可以使用LUA脚本操作redis

  • 原理

    1. LUA脚本具有原子性
    2. 如果业务逻辑较复杂,无法使用LUA脚本一次性完成,可以使用Zookeeper或者Redisson的分布式锁功能
  • StringRedisTemplate调用LUA脚本

    stringRedisTemplate.execute(RedisScript<T> var1, List<K> var2, Object... var3);
    // List<String> keys: 代表 Redis 键(Key),是 Lua 脚本中通过 KEYS[1], KEYS[2]... 访问的变量
    // Object... args: 代表额外参数,是 Lua 脚本中通过 ARGV[1], ARGV[2]... 访问的变量
    
  • LUA脚本

    1. 行结束符是换行(回车)
    2. 局部变量以local修饰,全局变量不加修饰符
    3. 字符串以单引号或者双引号表示,用..拼接字符串
    4. LUA脚本默认以字符串接收参数,tonumber(str)转换为数值,tostring(num)转换为字符串,str == "true"表示布尔值
    -- LUA接收调用参数
    local key = KEYS[1] -- 对应 Java 代码中的 keys.get(0)
    local value = ARGV[1] -- 对应 Java 代码中的 args[0]
    local bool = (key == "true") -- 布尔值没有专门的函数,可以使用逻辑表达式转换
    local num = tonumber(value) -- 数字类型需要转换
    -- LUA调用redis命令
    redis.call("命令名称","key","value",...,"其他参数")
    -- redis.pcall作用和redis.call相同,单可以捕获异常信息
    local ifSuccess, err = redis.pcall("SET", "mykey", "Hello") -- ifSuccess布尔值表示执行结果,如果有异常err会捕获信息
    if not success thenprint("Error:", err)
    elseprint("Success:", success)
    end
    
  • 举例:高并发抢购商品,使用LUA脚本防止超卖

    1. 使用setnx分布式锁不合适,如果用户抢购时未成功持锁就直接失败,会影响用户体验,即系统显得不可用
    2. 使用LUA脚本扣减库存保证原子性
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {// 1.查询抢购相关信息/*商品信息以Hash保存*key为good:goodId*value为{beginTime:begin,end:endTime,stoke:stoke}*/String begin = (String) stringRedisTemplate.opsForHash().get("good:" + goodId, "beginTime");String end = (String) stringRedisTemplate.opsForHash().get("good:" + goodId, "endTime");Long userId = ThreadLocalDto.threadLocal.get().getUserId();// 2.不在抢购期内直接失败if (LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 3.调用lua脚本开始抢购,尝试扣减库存Long luaResult = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.singletonList(goodId));if (luaResult == -1) {// 没有库存,抢购失败return Result.fail("抢购失败");}// 4.库存扣减成功,创建临时订单存入redis缓存,这一步也可以异步完成提高效率/*使用Set保存订单ID便于查询用户名下的订单,使用String保存订单具体内容便于删除*key为order:user:userId,value为订单ID的集合*key为order:orderId,value为订单内容*/Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idStringRedisTemplate.opsForSet().add("order:user:"+userId, String.valueOf(orderId));OrderDto orderDto = OrderDto.builder() //OrderDto使用Lombok或手动实现了Builder 模式.orderId(orderId).goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();stringRedisTemplate.opsForValue().set("order:"+orderId, JSON.toJSONString(orderDto));// 5.返回订单idreturn Result.ok(orderId);}
}
local goodId = KEYS[1] -- 抢购商品id
local stock = redis.call('hget', 'good:' .. goodId, 'stock') -- 扣减库存
if(tonumber(stock) <= 0) then --库存不足,下单失败,返回-1return -1 
end
redis.call('hincrby', 'good:' .. goodId, 'stock',- 1) -- 库存充足,扣减库存
return 0 -- 扣减成功,返回0

Redlock算法

  • 目的:传统setnx分布式锁在redis集群环境中可能因为主节点宕机,从节点未来得及同步,导致分布式锁失效,可使用Redlock算法

  • 原理

    1. 多数派机制:对每个节点都setnx加锁,如果成功节点数 ≥ N/2 + 1,则认为加锁成功
    2. 独立时钟校验:每个节点setnx加锁时,记录各自的过期时间,保证所有节点的锁ttl同时失效
    3. 释放锁:向所有节点发送 Lua 脚本解锁(仅删除自己持有的锁)
  • 其他方案

    1. RedLock适用于redis集群架构,建议使用Redisson的分布式锁
    2. 可以使用zookeeper加分布式锁
@Configuration
// 配置redis集群
public class RedisClusterConfig {// 1. 定义多个 Redis 连接工厂@Beanpublic RedisConnectionFactory redisConnectionFactory1() {return new LettuceConnectionFactory("127.0.0.1", 6379);}@Beanpublic RedisConnectionFactory redisConnectionFactory2() {return new LettuceConnectionFactory("127.0.0.1", 6380);}@Beanpublic RedisConnectionFactory redisConnectionFactory3() {return new LettuceConnectionFactory("127.0.0.1", 6381);}// 2. 定义多个 StringRedisTemplate,并交给 Spring 管理@Beanpublic StringRedisTemplate redisTemplate1(RedisConnectionFactory redisConnectionFactory1) {return new StringRedisTemplate(redisConnectionFactory1);}@Beanpublic StringRedisTemplate redisTemplate2(RedisConnectionFactory redisConnectionFactory2) {return new StringRedisTemplate(redisConnectionFactory2);}@Beanpublic StringRedisTemplate redisTemplate3(RedisConnectionFactory redisConnectionFactory3) {return new StringRedisTemplate(redisConnectionFactory3);}// 3. 注入 List<StringRedisTemplate>@Beanpublic List<StringRedisTemplate> redisTemplates() {return List.of(redisTemplate1(), redisTemplate2(), redisTemplate3());}
}
@Component
public class RedLockUtil {private final List<StringRedisTemplate> redisTemplates; // 多个 Redis 节点private final int quorum; // 法定数量(大多数)@Autowired // 可以省略(Spring 4.3+ 自动注入构造函数)public RedLockUtil(List<StringRedisTemplate> redisTemplates) {this.redisTemplates = redisTemplates;this.quorum = redisTemplates.size() / 2 + 1; // 在构造时计算}public boolean tryLock(String lockKey, long expireTime, long retryDelay, int maxRetries) {String lockValue = UUID.randomUUID().toString(); // 唯一标识,防止误删其他客户端的锁for (int i = 0; i < maxRetries; i++) {int successCount = 0;long startTime = System.currentTimeMillis();// 遍历所有 Redis 节点,尝试获取锁for (StringRedisTemplate redisTemplate : redisTemplates) {Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {successCount++;}}// 计算获取锁的总耗时long elapsed = System.currentTimeMillis() - startTime;// 如果大多数节点获取成功,并且总耗时小于锁过期时间,则认为加锁成功if (successCount >= quorum && elapsed < expireTime) {return true;}// 否则,释放所有节点上的锁(避免部分节点获取成功但最终失败)unlockInner(lockKey, lockValue);// 等待一段时间后重试(避免活锁)try {Thread.sleep(retryDelay + (long) (Math.random() * retryDelay));} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}return false;}public void unlock(String lockKey, String lockValue) {for (StringRedisTemplate redisTemplate. : redisTemplates) {// 建议下面改用 Lua 脚本确保原子性if (lockValue.equals.(redisTemplate.opsForValue().get(lockKey))){redisTemplate.delete(lockKey);}}}
}

任务队列

  • 目的:实现消息队列功能

  • 替代方案

    1. 使用Redisson的RTopic功能
    2. 使用MQ

消息队列

  • 特点

    1. 插入或获取操作立即返回,如果获取失败返回nil
    2. 要求redis5以上,支持消息持久化、支持ACK 机制、支持消息回溯、支持消费分组
  • Redis 5.0+参考kafka思想,引入了专门为消息队列设计的Stream数据结构

    1. Stream:类似List类型,value是链表,用于存储消息,每个消息节点 = 消息ID(可自动生成)+ 消息内容(键值对表示)
    2. Consumer Group:消费者组,同步存储对应key下Stream中的消息,主要有以下功能
      • 消息分发:将 PEL 中的未确认消息轮流分配给消费者,确保消息不重复分发,保证消费者组内的负载均衡
      • 进度管理:消费者组记录最后处理成功的消息 ID,确保新加入的消费者从正确位置开始消费
      • 消息重试:费者读取消息后,若未确认,消息会保留在 PEL 中,超时可以重新分配
      • 消息回溯:支持从任意位置重新消费
    3. Pending Entries List (PEL机制):每个消费者组维护一个 PEL,并记录未确认消息
  • 原理:生产者—指定Strem组—指定消费组—消费者

    1. 发送消息:生产者调用 XADD 命令向 Stream 添加消息,Stream 中每个节点存储一条消息

      XADD order_stream * product "iPhone" amount 2 customer "Alice"
      # order_stream:Stream 的键名
      # *:让 Redis 自动生成消息 ID
      # product "iPhone" amount 2 customer "Alice" :消息内容(键值对形式)
      
    2. 创建消费者组:消费者组首次消费前,需调用 XGROUP CREATE 创建组

      XGROUP CREATE order_stream order_group $ MKSTREAM
      # order_stream:Stream 键名
      # order_group:消费者组名
      # $:从最新消息开始消费,或 0 从最早开始
      # MKSTREAM:如果 Stream 不存在则自动创建
      
    3. 获取消息:消费者的消费操作XREADGROUP会自动注册到消费者组中,XREAD是消费者独立读取(不建议,可能会重复消费)

      XREADGROUP GROUP order_group consumer1 COUNT 1 STREAMS order_stream >
      # order_group:消费者组名
      # consumer1:消费者名称(需唯一,如基于进程 ID 或机器名)
      # COUNT 1:每次读取 1 条消息
      # >:仅读取未分配给其他消费者的新消息
      
    4. 消息确认:消费者消费完消息后,调用 XACK 确认消息,PEL 会自动移除该消息,表示消息已成功处理

      XACK order_stream order_group 1630000000000-0
      # order_stream:Stream 键名
      # order_group:消费者组名
      # 1630000000000-0:消息 ID
      
    5. 消息重试:消息超时未确认,可以调用 XCLAIM 将消息重新分配给其他消费者

      XCLAIM order_stream order_group new_consumer 300000 1630000000000-0
      # order_stream:Stream 键名
      # order_group:消费者组名
      # new_consumer:新的消费者名称
      # 300000:消息最小未确认时间(毫秒,即 5 分钟)
      
    6. 消息回溯:XREADXREADGROUP 指定起始 ID

      XREAD COUNT 10 STREAMS order_stream 1630000000000-0
      # 从指定消息 ID 开始读取历史消息
      # COUNT 10:限制最多返回 10 条消息
      
  • RedisTemplate对应API

    1. StringRedisTemplate.opsForStream().add(String key, Map<String, String> value):提交消息到指定stream
    2. StringRedisTemplate.opsForStream().createGroup(String streamkey, String groupKey):创建消费者组
    3. StringRedisTemplate.opsForStream().read(Consumer cons, StreamReadOptions Opt, StreamOffset streams):获取消息
    4. StringRedisTemplate.opsForStream().acknowledge(String streamkey, String groupKey, String... recordIds):确认ack
@Service
public class OrderService{ //生产者@Autowiredprivate StringRedisTemplate redisTemplate;public void addToStream(OrderDto orderDto) {String orderTask = JSON.toJSONString(orderDto);Map<String, String> orderMap = JSON.parseObject(orderTask, new TypeReference<Map<String,String>>() {});;// 添加到 Stream(如果Stream不存在会自动创建),自动生成消息IDRecordId recordId = stringRedisTemplate.opsForStream().add("order_stream", orderMap);}
}    
@Service
public class OrderQueueConsumer{private final StringRedisTemplate stringRedisTemplate;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate stringRedisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.stringRedisTemplate = stringRedisTemplate;this.consumerPool = consumerPool;initializeConsumerGroup();consumerPool.execute(this::readMessagesFromStream);}private void initializeConsumerGroup() {// 检查消费者组是否已存在(Stream会自动创建,但消费者组不会自动创建)List<StreamGroupInfo> groups = stringRedisTemplate.opsForStream().listGroups("order_stream");boolean groupExists = groups.stream().anyMatch(g -> GROUP_NAME.equals("order_group"));if (!groupExists) {stringRedisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);} }/*** 正常消费消息(Pending 列表)*/public void readMessagesFromStream() { //消费消息// 在调用 read() 前,始终通过 createGroup() 确保消费者组存在stringRedisTemplate.opsForStream().createGroup("order_stream", "order_group");while (true){// 从 ID "0" 开始读取,最多 10 条消息List<MapRecord<String, Object, Object>> messages = stringRedisTemplate.opsForStream().read(Consumer.from("order_group", "consumer1"), // 消费者组StreamReadOptions.empty().count(10).block(Duration.ofSeconds(0)), // 非阻塞,-1表示一直阻塞StreamOffset.create("order_stream", ReadOffset.lastConsumed()); // 从上次消费的位置继续);if (messages == null || messages.isEmpty()) {// 短暂休眠,避免频繁轮询Thread.sleep(1000);}messages.forEach(message -> {// 模拟业务处理(可能成功或失败)if(simulateBusinessProcessing(message)){// 处理成功,确认消息stringRedisTemplate.opsForStream().acknowledge("order_stream", "order_group", message.getId());}else {// 处理失败,不 ACK(消息会留在 Pending 列表,下次重试)System.out.println("Message " + messageId + " processing failed. Will retry later.");throw new RuntimeException("Processing failed");}});}}/*** 模拟业务处理(随机成功或失败)*/private boolean simulateBusinessProcessing(Map<Object, Object> values) {// 模拟随机失败(实际业务中替换为真实逻辑)return Math.random() > 0.3; // 70% 成功率}/*** 处理未确认的消息(Pending 列表)*/@Asyncprivate void readPendingMessagesFromStream() {List<MapRecord<String, Object, Object>> readPendingMessages = stringRedisTemplate.opsForStream().read(Consumer.from(GROUP_NAME, CONSUMER_NAME),StreamReadOptions.empty().count(10), // 每次读取10条StreamOffset.create(STREAM_KEY, ReadOffset.from(">")) // ">" 表示 Pending 列表);if (pendingMessages == null || pendingMessages.isEmpty()) {// 短暂休眠,避免频繁轮询Thread.sleep(1000);}for (MapRecord<String, Object, Object> message : pendingMessages) {... // 重试模式逻辑内容}}
}

阻塞队列

  • 特点:生产者消费者模型,利用阻塞队列避免忙等待

  • 原理:List类型的BLPOP/BRPOP 命令

    1. RPUSH key value [value ...]:从右侧插入数据(生产者使用,左查对应右插,保证任务时间上的相对顺序)
    2. BLPOP key [key ...] timeout:从左侧阻塞地弹出元素,若列表为空则阻塞直至有数据或超时(返回nil)
    3. 为保证顺序,右插—左取、左插—右取
  • RedisTemplate中没有BLPOP命令对应的方法,可以采用以下两种方法

    1. RedisTemplate.execute()调用原生命令(推荐使用)
    2. 直接使用 Lettuce/Jedis(不建议,因为会降低与 Spring 生态的集成度)
  • 举例:用户主动取消支付,实时异步执行取消订单

    1. 消费者端获取消息
    2. 消费者端异步自动消费,可设置阻塞时间(例如空列表时一直阻塞)
@Service
public class OrderService { // 生产者@Autowiredprivate StringRedisTemplate redisTemplate;public void cancelOrder(OrderDto orderDto) {String orderTask = JSON.toJSONString(orderDto);redisTemplate.opsForList().rightPush("my_task_queue", orderTask);}
}
@Component
public class OrderQueueConsumer { //消费者private final StringRedisTemplate redisTemplate;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate redisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redisTemplate = redisTemplate;this.consumerPool = consumerPool;consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {String task = redisTemplate.execute((RedisConnection connection) -> {List<byte[]> result = connection.bLPop(0, "order_cancel_queue".getBytes()); // 0 = 无限阻塞if (result != null && !result.isEmpty()) {return new String(result.get(1));}return null; // 理论上不会执行,因为 timeout=0});System.out.println("Processing task: " + task);}}
}

延迟队列

  • 目的:当前任务执行完毕后,一段时间后自动执行指定任务

  • 原理:Zset+轮询

    1. 使用Zset构建一个延迟队列保存延迟任务,score为过期时间,这样可以使用范围查询高效获取到期任务
    2. 使用定时任务(例如Spring的@Scheduled功能)查询到期任务
    3. 处理到期任务,为了解耦延迟逻辑和任务执行逻辑,也可以将到期任务放入任务队列中进一步处理
  • 其他方案:希望更准时执行任务

    1. 使用Redissson的RDelayedQueue功能
    2. 使用MQ的延迟消息功能
  • 举例:订单超时自动取消

    1. 用户下订单后,将临时订单信息保存到redis缓存,并将此订单加入redis的延迟队列中
    2. 如果用户已付款,则将该订单信息保存到数据库中,删除延迟队列中的该订单信息
    3. 如果用户超时未付款,删除延迟队列和缓存中的该订单信息
@Service
public class TemporaryOrdersDelayed {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate OrderMapper orderMapper;/** 添加延迟任务* @param taskId 任务ID* @param delaySeconds 延迟时长(秒)* @param taskData 任务内容*/public void addDelayedTask(Long orderId, Long userId, long delaySeconds) {long executeTime = Instant.now().getEpochSecond() + delaySeconds;//加入延迟队列/*延迟队列使用ZSet保存,便于范围查询*key为order:delayed_queue*value为userId:oderId*/stringRedisTemplate.opsForZSet().add("order:delayed_queue", userId+":"+oderId, executeTime);}// 每秒检查一次到期任务@Scheduled(fixedRate = 1000)public void pollTasks() {long now = Instant.now().getEpochSecond();// 查询到期任务,即时间早于当前时间Set<String> orders = stringRedisTemplate.opsForZSet().rangeByScore("order:delayed_queue", 0, now);if (orders == null || orders.isEmpty()) {return; // 无任务,直接返回}// 执行到期任务,这里直接执行,逻辑较复杂时建议放入阻塞队列中进一步处理for (String order : orders) {String[] orderArray = order.trim().split(":");Long userId = Long.parseLong(orderArray[0]);Long orderId = Long.parseLong(orderArray[1]);// 查询数据库,判断用户是否付款OrderPo orderPo = orderMapper.getOrder(orderId);if (orderPo == null){ //未付款,取消订单,恢复库存// 如果服务部署了多个实例,这里就需要保证原子性,防止超卖stringRedisTemplate.opsForSet().remove("order:user:"+userId, orderId); stringRedisTemplate.delete("order:"+orderId);stringRedisTemplate.opsForHash().increment("good:"+orderDto.getGoodId(), "stoke", 1);}// 延迟队列移除任务stringRedisTemplate.opsForZSet().remove("order:delayed_queue", order);}}
}

发布订阅

  • 目的:生产者发出消息广播,消费者实时接收消息通知

  • 原理:redis发布订阅功能(Pub/Sub)

    1. 创建一个频道,订阅端先订阅此频道
    2. 发布端对此频道发布消息,订阅端会实时接收到消息
  • 发布订阅功能应用场景

    1. 消息不持久化,不适用核心业务
    2. 适用于发布订阅一对一场景,否则可能会消息堆积
  • redis命令行演示

    #打开一个客户端订阅频道wyh,必须先订阅才能接受消息
    127.0.0.1:6379> subscribe wyh
    Reading messages... (press Ctrl-C to quit)
    #打开另一个客户端,给wyh频道发布消息hello
    127.0.0.1:6379> publish wyh hello
    #第一个客户端可以实时接收消息
    127.0.0.1:6379> SUBSCRIBE wyh
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "wyh"
    3) (integer) 1
    1) "message"
    2) "wyh"
    3) "hello"
    
  • 举例:实时更新订单状态

    1. 创建任务订阅类:实现MessageListener接口即可
    2. 配置redis消息监听器RedisMessageListenerContainer:绑定订阅者和频道
    3. 发布者向频道发布消息:redisTemplate.convertAndSend(channel, message)
    @Service
    public class oderService { //发布者@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发布消息到指定频道* @param channel 频道名* @param message 消息内容*/public void publish(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
    }
    
    @Component
    public class OrderSubscriber implements MessageListener { //订阅者@Autowiredprivate StringRedisTemplate stringRedisTemplate;/** MessageListener接口已经默认实现了使用Message传输消息* 如果不实现MessageListener接口,就需要在Redis消息监听容器配置类中设置消息转换器*/@Overridepublic void onMessage(Message message, byte[] pattern) {String taskData = new String(message.getBody());String channel = new String(message.getChannel());String patternStr = pattern != null ? new String(pattern) : "null"; //pattern是订阅模式,普通订阅为null// 执行消费任务逻辑(如调用服务处理任务)OrderDto orderDto = JSON.parseObject(taskData, OrderDto.class);// 更新订单状态,key:orderID,value:状态stringRedisTemplate.opsForValue().set(orderDto.getOrderId,orderDto.getStatus);}
    }
    
    @Configuration
    public class RedisPubSubConfig { // Redis消息监听容器配置类private final OrderSubscriber orderSubscriber; // 订阅者private final RedisConnectionFactory redisConnectionFactory;@Autowiredpublic RedisPubSubConfig(OrderSubscriber orderSubscriber, RedisConnectionFactory redisConnectionFactory) {this.orderSubscriber = orderSubscriber;this.redisConnectionFactory = redisConnectionFactory;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 订阅频道:channel:ordercontainer.addMessageListener(orderSubscriber, new ChannelTopic("channel:order"));// 可以订阅多个频道// container.addMessageListener(subscriber, new ChannelTopic("channel:news"));return container;}
    }
    

业务幂等

  • 目的:因网络等问题前端发送重复请求时,后端只处理一次,也可以用于解决MQ重复消费问题

  • 原理

    1. 用户发出真正的业务请求前先请求后端生成一个关联的token,保存到redis中,将token返回前端
    2. 请求携带此token发送给后端,后端去redis查,如果存在说明请求未处理,处理完删除redis中的token
    3. 如果后端未查到token,说明是重复请求,直接返回即可
@RestController
@RequestMapping("/repead-api")
public class TokenController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate UserService userService;// 前端发送请求前先申请一个唯一token@GetMapping("/generate-token")public Result generateToken() {// 生成唯一Token (这里使用UUID)String token = UUID.randomUUID().toString();// 存入Redis,设置过期时间stringRedisTemplate.opsForValue().set("req_token:" + token, "1", // 值可以是任意内容,我们只关心key是否存在60, // 60秒过期TimeUnit.SECONDS);return Result.ok(token);}// 处理业务请求@PostMapping("/actual-endpoint")public Result handleRequest(HttpServletRequest request) { // 1. 验证Token是否存在String token = request.getHeader("X-Request-Token");if (Boolean.FALSE.equals(redisTemplate.hasKey("req_token:" + token))) {// Token不存在或已使用,返回重复请求提示return Result.fail("重复请求,请勿重复提交");}try {// 2. token存在,执行业务,删除TokenstringRedisTemplate.delete("req_token:" + token);return userService.api();} catch (Exception e) {// 异常处理return Result.fail("服务器错误");}}
}

缓存查询

  • 目的:缓存查询的redi应用最广泛的功能,作为服务器和数据库间的缓存中间件,缓解了数据库高并发的风险

  • 原理

    1. 用户查询数据时,先查询redis中的数据,查询成功直接返回
    2. 如果redis查不到数据,再查数据库中的数据,查询成功后再将数据同步到redis(根据业务设置有效期)
    3. 尽量防止查询到数据库层面,否则有数据库宕机风险(缓存穿透,缓存击穿)
@Service
public class ShopServiceImpl implements IShopService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate ShopMapper shopMapper;@Overridepublic Result getShopById(Long id) {// 1.从redis查询商铺String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.判断商铺是否存在if (shopJson != null && !shopJson..trim().isEmpty()){// 3.商铺存在,则返回商铺信息Shop shop = JSON.parseObject(shopJson, Shop.class);return Result.ok(shop);}// 4.redis中没有商铺,则去数据库中查Shop shop = shopMapper.selectById(id);if (null == shop){// 5.数据库中也不存在,说明没有这个商铺,查询失败return Result.fail("商铺不存在");}// 6.数据库中可以查到,于是更新缓存并设置有效期stringRedisTemplate.opsForValue().set("cache:shop:"+id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);return Result.ok(shop);}
}

缓存更新

  • 目的:用户修改数据时,缓存也要同步更新

  • 原理

    1. 用户更新操作是少量的,可以直接操作数据库
    2. 更新完数据库后,直接删除对应缓存即可,无需手动更新,因为下次再查询此数据时会自动同步缓存
    3. 不能先删除缓存,否则高并发场景下,可能还没更新完数据库,方式并发查询导致缓存又同步了,导致缓存删除失败
@Service
public class ShopServiceImpl implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;@Override@Transactionalpublic Result modify(Shop shop) {//1.更新数据库int id = shopMapper.updateById(shop);System.out.println("数据库更新完成");//2.删除缓存stringRedisTemplate.delete("cache:shop:"+shop.getId());System.out.println("已删除缓存");return Result.ok(id);}

缓存穿透

  • 描述:用户查询的key在redis和数据库都不存在,会直接访问到数据库中,短时间高并发访问此数据会导致数据库宕机

  • 解决方法

    1. 建立空值缓存:如果用户查询到数据库还没有,就将此数据缓存到redis中,相应的值设为空或者约定的错误码
    2. 布隆过滤:不靠谱
    3. 限流:当请求量超过系统处理能力或者单个用户恶意发送大量请求时,限流会拒绝非法请求
@Service
public class ShopServiceImpl implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;@Overridepublic Result getShopById(Long id) {// 1.从redis查询商铺String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.判断商铺是否存在if (shopJson != null && !shopJson..trim().isEmpty()){// 3.商铺存在,则返回商铺信息Shop shop = JSON.parseObject(shopJson, Shop.class);return Result.ok(shop);}// 4.redis中没有商铺,则去数据库中查Shop shop = shopMapper.selectById(id);if (null == shop){// 5.数据库中也不存在,说明没有这个商铺,防止恶意请求攻击,对此请求中的id进行缓存空对象stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",60L,TimeUnit.MINUTES);return Result.fail("商铺不存在");}// 6.数据库中可以查到,于是更新缓存并设置有效期stringRedisTemplate.opsForValue().set("cache:shop:"+id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);return Result.ok(shop);}
}

缓存击穿

  • 描述:高并发场景下热点key过期,缓存还未来得及同步,大量请求就已经查询该数据,导致击穿到数据库

  • 解决方法

    1. 加分布式锁:第一个线程访问数据库时,对同步缓存过程加锁,保证同一时间只有一个线程访问数据库,此时其他线程无法持锁,访问失败
    2. 逻辑过期:热点key始终不过其,而是根据业务设置逻辑过期时间,用户访问时逻辑过期就更新缓存,实现缓存定时更新,这样做还可以在请求未能获得锁时不会失败而是返回旧数据,增强用户体验
    3. 限流:当请求量超过系统处理能力或者单个用户恶意发送大量请求时,限流会拒绝非法请求
  • 缓存击穿问题发生在极端情况下,普通查询不必考虑缓存击穿问题,只需考虑缓存穿透问题即可

// 缓存值包装类,包含逻辑过期时间
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class RedisShopData {private LocalDateTime expireTime;private Shop shop;
}@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;// 查询热点数据@Overridepublic Result queryHotSpot(Long id) { String threadId = String.valueOf(Thread.currentThread().getId()); // 当前线程IDtry {// 1.查询redisString shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.热点key不会过期,如果查不到,不用查询数据库,可以认为没有这个数据if (shopJson == null || shopJson.trim().isEmpty()){return Result.fail("没有这个店铺");}RedisShopData redisShopData = JSON.parseObject(shopJson, RedisShopData.class);Shop shop = redisShopData.getShop();LocalDateTime expireTime = redisShopData.getExpireTime();// 3.判断是否逻辑过期if (expireTime.isAfter(LocalDateTime.now())){return Result.ok(shop); //没有过期,直接返回}// 4.逻辑过期,准备拿锁进行缓存更新Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent("lock:shop:" + id, threadId, 10, TimeUnit.SECONDS);// 5.没有拿到锁,于是直接返回旧的数据if (!Boolean.TRUE.equals(ifAbsent)){return Result.ok(shop);}try{// 6.成功拿到锁,双重检查,防止数据已被其他线程更新String newShopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);RedisShopData newRedisShopData = JSON.parseObject(newShopJson, RedisShopData.class);if (newRedisShopData.getExpireTime().isAfter(LocalDateTime.now())) {return Result.ok(newRedisShopData.getShop()); //已经被更新,直接返回}// 7.开始查询数据库更新缓存,重置逻辑时间(一分钟)Shop newShop = shopMapper.selectById(id);newRedisShopData.setShop(newShop);newRedisShopData.setExpireTime(LocalDateTime.now().plusSeconds(60));stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSON.toJSONString(newRedisShopData));return Result.ok(newShop);} finally {// 7.释放锁,注意防止锁误删if (threadId.equals(stringRedisTemplate.opsForValue().get("lock:shop:" + id))){stringRedisTemplate.delete("lock:shop:" + id);}}} catch (Exception e) {return Result.fail("服务器错误");}}
}

缓存雪崩

  • 描述:缓存中的大量key同时过期或缓存服务宕机,导致所有请求直接打到数据库

  • 解决方法

    1. 大量key同时过期:在设置缓存时,给过期时间增加一个随机值,避免大量缓存同时失效、使用分级缓存
    2. redis宕机:搭建redis集群、或者使用熔断机制
    3. 限流策略
  • 缓存雪崩问题发生比缓存击穿发生概率更低,正常场景下只需考虑缓存穿透问题即可

1.背景问题:用户购买商品下单后,后台生成了临时订单,扣减了库存,但用户迟迟不付款,导致商品一直显示售空但实际上并没有

2.解决方案:用户下临时订单后,对临时订单设立逻辑有效期(这里可以用Set保存临时订单),再开启定时任务,定时扫描临时订单,如果扫描到订单过期,则执行关闭订单动作:回滚库存,删除临时订单

位图统计

  • 原理:对于布尔型数据,redis提供了Bitmap数据类型存储,占用空间极低

    1. 基本数据类型是用文本保存内容,如果存储布尔型数据就比较浪费空间
    2. Bitmap本质上也是字符串(key-value) , 但是它可以对value的位进行操作,每一位只能存储0和1
    3. Bitmap(key-value)的value位数会根据情况动态扩容
  • Bitmap常见命令

    SETBIT key offset value  # 设置偏移量为offset的位为0或1
    GETBIT key offset  # 获取偏移量为offset的位的值
    BITCOUNT key [start end]  # 统计指定字节范围内1的数量
    BITPOS key bit [start end]  # 查找第一个值为0或1的位的位置
    
  • 举例:签到系统

    1. Key:user:sign:{userId}:{yearMonth}(如 user:sign:1001:202507 表示用户 1001 在 2025 年 7 月的签到数据)
    2. Value:每一位代表一天,已签到则设为1(第 0 位 = 1 号,…,第 30 位 = 31 号)
    3. BITCOUNT user:sign:{userId}:{yearMonth}:统计当月签到次数
@Service
public class SignService {@Resourceprivate StringRedisTemplate redisTemplate;/*** 用户签到* @param userId 用户ID* @param date 签到日期(默认当天)* @return 是否签到成功(true=首次签到,false=已签到)*/public boolean doSign(Long userId, LocalDate date) {String key = String.format("user:sign:%d:%d%02d", userId, date.getYear(), date.getMonthValue());int offset = date.getDayOfMonth() - 1; // 计算偏移量(0=1号,1=2号,...)// 使用 BITFIELD 或 SETBIT 设置签到状态Boolean success = redisTemplate.opsForValue().setBit(key, offset, true);return Boolean.TRUE.equals(success); // 如果原值为false(未签到),返回true表示首次签到}/*** 检查用户某天是否已签到* @param userId 用户ID* @param date 查询日期* @return true=已签到,false=未签到*/public boolean checkSign(Long userId, LocalDate date) {String key = String.format("user:sign:%d:%d%02d", userId, date.getYear(), date.getMonthValue());int offset = date.getDayOfMonth() - 1;Boolean isSign = redisTemplate.opsForValue().getBit(key, offset);return Boolean.TRUE.equals(isSign);}/*** 统计用户当月签到次数* @param userId 用户ID* @param yearMonth 年月(如 "202507")* @return 签到次数*/public long getSignCount(Long userId, String yearMonth) {String key = String.format("user:sign:%d:%s", userId, yearMonth);return redisTemplate.execute(connection -> connection.bitCount(key.getBytes()));}/*** 获取用户当月连续签到天数* @param userId 用户ID* @param date 查询日期(默认当天)* @return 连续签到天数*/public long getContinuousSignCount(Long userId, LocalDate date) {String key = generateKey(userId, date);int daysInMonth = date.lengthOfMonth();int todayOffset = date.getDayOfMonth() - 1;long continuousCount = 0;for (int i = todayOffset; i >= 0; i--) {Boolean isSign = redisTemplate.opsForValue().getBit(key, i);if (Boolean.TRUE.equals(isSign)) {continuousCount++;} else {break;}}return continuousCount;}
}
http://www.xdnf.cn/news/1320499.html

相关文章:

  • 【redis、ruoyi-vue】基于ruoyi-vue实现数据redis的增删改查
  • Java面试宝典:Redis高级特性和应用(发布 订阅、Stream)
  • [python学习记录1]python简介
  • 最小路径和
  • 在职老D渗透日记day19:sqli-labs靶场通关(第26a关)get布尔盲注 过滤or和and基础上又过滤了空格和注释符 ‘)闭合
  • 线程(基本概念和相关命令)
  • LeetCode热题100--104. 二叉树的最大深度--简单
  • Rust:实现仅通过索引(序数)导出 DLL 函数的功能
  • STM32单片机学习日记
  • 网络常识-SSE对比Websocket
  • 记一次安装OpenStack(Stein)-nova报错问题解决
  • 数据赋能(396)——大数据——抽象原则
  • 智能汽车领域研发,复用云原生开发范式?
  • 48.Seata认识、部署TC服务、微服务集成
  • http工作流程
  • C++算法竞赛:位运算
  • 前端项目练习-王者荣耀竞赛可视化大屏 -Vue纯前端静态页面项目
  • 服务器管理与配置学习总结
  • MYSQL-175. 组合两个表
  • JavaScript性能优化实战(四):资源加载优化
  • LeetCode 837.新 21 点:动态规划+滑动窗口
  • 【数据结构】堆和二叉树详解——上
  • 旋钮键盘项目---foc讲解(闭环位置控制)
  • 学习Python中Selenium模块的基本用法(5:程序基本步骤)
  • Linux817 shell:until,nfs,random
  • 力扣438:找到字符串中所有的字母异位词
  • Django前后端交互实现用户登录功能
  • [python学习记录2]变量
  • 脉冲计数实现
  • Docker之自定义jkd镜像上传阿里云