Redis——基础篇
Redis介绍
-
Redis是一种NoSQL数据库,也称为缓存中间件
-
Redis和MySQL
- Redis保存半非结构化数据,MySQL保存结构化数据
- Redis运行在内存上,MySQL运行在硬盘上
- Redis语法不通用,MySQL遵循w3c的SQL语句
- Redis没有事务回滚功能,MySQL支持事务
-
Redis和ES
- Redis存储半结构化数据,ES存储非结构化数据,查询功能上ES更强大
- Redis完全运行在内存中,ES使用内存和磁盘的混合结构,性能上Redis更好,且Redis延迟更低
- Redis只能有限的分布式部署,ES天然具有分布式特性,数据量上ES更强大
- 高频读写、实时性要求高选择Redis;若需处理大规模数据、复杂搜索分析或数据可视化,选择Elasticsearch
-
Redis基本特征
- Redis运行在内存上,读写性能极高
- Redis默认单线程运行,并发访问时会严格顺序执行
- Redis虽然有持久化策略,但依然有数据丢失风险
- Redis支持多节点部署
-
应用场景
- 查询缓存
- 分布式锁
- 分布式会话
- 消息队列
- 地理空间
- 位图统计
数据类型
Commands | Docs (redis.io)
通用规则
-
Redis 是一个基于内存的键值对(Key-Value)数据库,其核心数据结构围绕键值对展开
- 一般情况下,约定key的类型为字符串,下述的数据类型是描述value的类型
- key的值可以用冒号隔开,默认形成层级结构方便管理(例如
项目:业务:类型:数据id
)
set "com:wyh:user:id" 1
-
redis的命令对大小写不敏感
-
redis不严格要求引号声明字符串,redis会自动转换,但建议统一使用双引号
set age 25 # 隐式转换,等效于 set "age" "25"
-
redis的结束符是换行符(回车),每一行就是一条命令,如果一次执行多条命令可以使用管道
#\n是换行符,redis执行到此就结束当前命令的执行 echo -e "SET key1 value1\nGET key1\nINCR counter" | nc localhost 6379
-
通用命令
select 数字
:表示选取几号数据库flushall
:清除当前数据库del key名[或者key列表]
:删除/批量keyexists key名[或者key列表]
:查询是否存在key,返回存在的key的数量type key名
:查看value数据类型expire key名 秒数
:给key设置有效期,到期后key会被自动删除ttl key名
:查看key剩余有效秒数keys 匹配表达式
:查看匹配的key,可以模糊匹配(如keys *yh
),不能跨库匹配**
String
-
字符串类型是redis的基本类型,Redis的所有数据类型都可以归结为字符串的变种
-
redis没有数值类型,声明数值以字符串形式声明,做数值运算时redis会隐式转换
-
应用场景
- 缓存热点数据(如 HTML 片段、用户信息)
- 分布式锁(通过
SET key value NX EX
实现互斥锁) - 计数器(如页面访问量、点赞数)
-
常见命令
-
set 键 值
:添加String类型的键值对或者修改String类型的键值对 -
get 键
:根据key获取String类型的value -
mset 键1 值1 键2 值2...
:批量添加多个String类型的键值对,如果该key已经存在则修改该key的value -
mget 键1 键2...
:根据多个key获取多个String类型的value -
incr 整数路径
:让一个整数形式的字符串自增+1,返回自增后的value -
incrby 整数路径 步长
:让一个整数形式的字符串按指定步长自增,返回自增后的value -
incrbyfloat 浮点数路径 步长
:让一个浮点数形式的字符串按照指定步长自增(incr可以为负数) -
setnx 键 值
:添加一个String类型的键值对,前提是这个key不存在,否则不执行(返回0) -
setex 键 值 秒数
:添加一个String类型的键值对,并指定有效期,如果此key存在则修改并指定有效期
-
#新增键值对
127.0.0.1:1145> set name wyh
OK
#获取value
127.0.0.1:1145> get name
"wyh"
#修改已经存在的键值对
127.0.0.1:1145> set name wyh1
OK
127.0.0.1:1145> get age 1
"24"
#整数形式字符串自增+1
127.0.0.1:1145> incr age
(integer) 25
#key为status不存在,添加成功
127.0.0.1:1145> setnx status 8
(integer) 1
#key为status已经存在,添加不执行,返回0
127.0.0.1:1145> setnx status 6
(integer) 0
#设置name的value为wyh3,并指定有效期为3秒
127.0.0.1:1145> setex name 3 wyh3
OK
List
-
redis中的List一个双向链表,支持正向索引,也支持反向索引
-
应用场景
- 消息队列(生产者-消费者模型)
- 最新消息列表(如用户最近的 100 条动态)
- 栈或队列(通过
LPUSH/RPOP
或RPUSH/LPOP
组合实现)
-
常见命令
-
lpush 键 元素 元素2...
:向列表左侧加入元素 -
lpop 键 count
:删除列表中键对应的左侧count个元素,没有就返回nil -
rpush 键 元素1 元素2...
:向列表右侧加入元素 -
rpop 键 count
:删除列表右侧开始count个元素,没有就返回nil -
lrange 键 开始游标 结束游标
:返回列表中指定的一段数据,游标从0开始 -
blpop 键 秒数 brpop 键 秒数
:删除列表左侧/右侧第一个元素,没有就等待指定时长,还没有就返回nil -
lindex <key><index>
:按照索引下标获得元素(从左到右) -
linsert <key> before <value><newvalue>
:在value的后面插入newvalue插入值 -
llen <key>
:获得列表长度 -
LSET key index element
:修改index位置的元素值 -
BLPOP key [key ...] timeout
:从左侧阻塞地弹出元素,若列表为空则阻塞,直到超时或有数据 -
BRPOP key [key ...] timeout
:从右侧阻塞地弹出元素(类似 BLPOP,但方向相反)
-
#左插多个元素
127.0.0.1:1145> lpush student wyh1 wyh2 wyh3 wyh4
(integer) 4
#lrange 键 0 -1:查看列表所有值
127.0.0.1:1145> lrange student 0 -1
1) "wyh4"
2) "wyh3"
3) "wyh2"
4) "wyh1"
#右插多个元素
127.0.0.1:1145> rpush student1 wyh1 wyh2 wyh3 wyh4
(integer) 4
127.0.0.1:1145> lrange student1 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"
4) "wyh4"
#删除列表左侧开始两个元素,返回删除的元素
127.0.0.1:1145> lpop student 2
1) "wyh4"
2) "wyh3"
127.0.0.1:1145> lrange student 0 -1
1) "wyh2"
2) "wyh1"
Hash
-
redis中的Hash是一个无序Map字典
-
应用场景
- 存储复杂对象(如用户信息、商品信息)
- 聚合统计(如记录商品的浏览次数、收藏量)
-
Hash常用命令
hset 键 字段 字段值
:添加或者修改一个键值对hget 键 字段
:查询hash的指定字段的值hmset 键 字段1 字段值1 字段2 字段值2...
:添加多个键值对hmget 键 字段1 字段2...
:查询多个指定hash字段的字段值hgetall 键
:获取一个键中所有的hash的字段:字段值hkeys 键
:获取一个键中所有的hash的字段hvals 键
:获取一个键中所有的hash的字段值hincrby 键 字段 步长
:让hash类型中某一个字段值根据步长自增,注意字段值只能是整数hsetnx 键 字段 字段值
:当hash类型不存在时才能添加hash类型,如果已经存在则返回0hexists <key1> <field>
:查看哈希表 key 中,给定域 field 是否存在
#添加键
127.0.0.1:1145> hset user name wyh1
(integer) 1
#查询键为user,值的字段为name的字段值
127.0.0.1:1145> hget user name
"wyh1"
#添加键为user,值为{name:wyh,age:24,gender:male}
127.0.0.1:1145> hmset user name wyh age 24 gender male
OK
#查询键为user,值中字段为name和age的字段值
127.0.0.1:1145> hmget study:redis:student:1 name age
1) "wyh"
2) "24"
#user已存在,不执行
127.0.0.1:1145> hsetnx user name wyh
(integer) 0
Set
-
redis的set是元素不重复的集合
-
应用场景
- 标签系统(记录用户的兴趣标签)
- 去重统计(如文章的唯一阅读用户)
- 社交网络关系(如关注列表、粉丝列表)
-
常见命令
-
sadd 键 元素1 元素2...
:向Set中添加一个或多个元素 -
srem 键 元素2 元素2...
:移除Set中指定的一个或多个元素 -
scard 键
:返回Set中元素的个数 -
sismember 键 元素
:判断元素是否在集合中 -
smembers 键
:获取集合中所有元素 -
sinter 键1 键2...
:求键1的set和键2的set交集 -
sdiff 键1 键2...
:求键1的set和键2的set差集 -
sunion 键1 键2...
:求键1的set和键2的set并集
-
#一次添加多个元素
127.0.0.1:6379> sadd name wyh1 wyh2 wyh3
(integer) 3
#集合不允许重复,插入已经存在的元素则不执行返回0
127.0.0.1:6379> sadd name wyh1
(integer) 0
#移除指定元素
127.0.0.1:6379> srem name wyh3
(integer) 1
#返回集合中元素数量
127.0.0.1:6379> scard name
(integer) 2
#判断指定元素是否在集合中,不在则返回0
127.0.0.1:6379> sismember name wyh1
(integer) 1
127.0.0.1:6379> sismember name wyh6
(integer) 0
Zset
-
Zset是可排序的set
- Zset底层通过跳表实现排序,元素带有score属性,score越大的元素在Zset的游标越大
- 底层通过哈希表实现查找
-
应用场景
- 排行榜(实时更新游戏玩家分数排名)
- 带权重的队列(按优先级处理任务)
- 时间轴(按时间戳排序的消息列表)
-
常见命令
-
zadd 键 score1 元素1 score2 元素2...
:添加一个或多个元素到Zset中,如果已经存在则修改score值 -
zrem 键 元素1 元素2...
:删除一个或多个指定元素 -
zrank 键 元素
:查看指定元素在集合中的排序 -
zcard 键
:获取Zset中元素的个数 -
zcount 键 min max
:统计score在min~max之间(包含min和max)的元素个数 -
zrangebyscore 键 min max
:获取score在min~max之间的元素 -
zrange 键 min max
:获取游标在min~max之间的元素 -
zincrby 键 元素 步长
:让集合指定元素的score根据步长自增 -
zdiff,zinter,zunion
:求差集、交集、并集
-
#添加多个元素到有序集合中
127.0.0.1:6379> zadd user 10 wyh1 20 wyh2 30 wyh3 40 wyh4
(integer) 4
127.0.0.1:6379> zrange user 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"
4) "wyh4"#删除有序集合中指定元素
127.0.0.1:6379> zrem user wyh4
(integer) 1
127.0.0.1:6379> zrange user 0 -1
1) "wyh1"
2) "wyh2"
3) "wyh3"#查看有序集合中元素的个数
127.0.0.1:6379> zcard user
(integer) 3#统计score值在指定范围内的元素个数
127.0.0.1:6379> zcount user 20 30
(integer) 2#获取score值在指定范围内的元素
127.0.0.1:6379> zrangebyscore user 20 30
1) "wyh2"
2) "wyh3"#让wyh1元素的score值自增2
127.0.0.1:6379> zincrby user 2 wyh1
"12"
Java客户端
-
常见客户端
- jedis:方法名称就是redis的命令名,使用简单
- lettuce:基于netty实现,支持redis的集群,支持同步、异步、响应式编程等高级功能
- redisson:基于分布式部署redis客户端
-
一般业务中推荐使用lettuce,分布式部署(如 集群模式、哨兵模式、主从架构)推荐使用redisson
Jedis
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
public class JedisConnectionPool { //Jedis线程不安全,必须使用连接池private static final JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();//最大连接数jedisPoolConfig.setMaxTotal(10);//最大空闲连接jedisPoolConfig.setMaxIdle(10);//最小空闲连接jedisPoolConfig.setMinIdle(10);//设置最长等待时间jedisPoolConfig.setMaxWaitMillis(10);//选择0号库,连接超时时长2000msjedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 2000, "123456", 0);}//从连接池中获jedis连接public static Jedis getJedis() throws Exception{return jedisPool.getResource();}//回收jedis连接public static void close(Jedis jedis){if (jedis != null){//Jedis连接池不会自动回收连接,需要手动归还jedis.close();}}
}
public class Main {public static void main(String[] args) {//获取连接池中的jedis连接对象Jedis jedis = null;try {jedis = JedisConnectionPool.getJedis();} catch (Exception e) {System.out.println("连接失败");}//开始进行数据操作String result = null;if (jedis != null) {result = jedis.set("name","wyh");System.out.println("插入/修改键值对name:wyh,返回结果是:\n"+result);String name = jedis.get("name");System.out.println("查询键为name的值为:\n"+name);}//归还连接JedisConnectionPool.close(jedis);}
}
SpringDataRedis
- Spring Data Redis是Spring官方为redis主流客户端的集成框架,同时提供了更丰富的功能
- 兼容jedis、lettuce(默认)等众多客户端
- 自带序列化和反序列化
- 支持redis发布订阅等高级功能
- 支持redis集群
<!--nosql-spring data redis-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--连接池门面,具体使用哪个连接池取决于使用哪种客户端,可以自动回收连接-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
<!--spring data redis默认使用lettuce,如果使用jedis需要在配置类中手动指定jedis(见下文)-->
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
- 单机模式
# 单机模式快速配置
spring:redis:host: 127.0.0.1port: 6379password: yourpassword
@Configuration
public class RedisConfig {// 使用 Jedis 连接池(需要额外加jedis依赖)@Beanpublic RedisConnectionFactory jedisConnectionFactory() {// 1. 配置 Redis 服务器信息RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();redisConfig.setHostName("localhost");redisConfig.setPort(6379);redisConfig.setPassword("yourpassword"); // 如果没有密码,可以不设置redisConfig.setDatabase(0); // 默认数据库// 2. 配置 Jedis 连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8); // 最大连接数poolConfig.setMaxIdle(8); // 最大空闲连接数poolConfig.setMinIdle(0); // 最小空闲连接数poolConfig.setMaxWaitMillis(1000); // 获取连接超时时间(1秒)// 3. 配置 Jedis 客户端选项JedisClientConfiguration jedisClientConfig = JedisClientConfiguration.builder().connectTimeout(Duration.ofSeconds(10)) // 连接超时时间.readTimeout(Duration.ofSeconds(5)) // 读取超时时间.usePooling() // 启用连接池.poolConfig(poolConfig) // 设置连接池配置.build();// 4. 创建 JedisConnectionFactoryreturn new JedisConnectionFactory(redisConfig, jedisClientConfig);}// 使用 Lettuce 连接池(默认)@Beanpublic RedisConnectionFactory lettuceConnectionFactory() {// 1. 配置 Redis 服务器信息RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();redisConfig.setHostName("localhost");redisConfig.setPort(6379);redisConfig.setPassword("yourpassword");redisConfig.setDatabase(0);// 2. 配置 Lettuce 连接池GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(8); // 最大连接数poolConfig.setMaxIdle(8); // 最大空闲连接数poolConfig.setMinIdle(0); // 最小空闲连接数poolConfig.setMaxWaitMillis(-1); // 获取连接超时时间(-1 表示无限等待)// 3. 配置 Lettuce 客户端选项(可选)SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)) // 连接超时时间.build();ClientOptions clientOptions = ClientOptions.builder().socketOptions(socketOptions).timeoutOptions(TimeoutOptions.enabled()) // 启用超时选项.build();// 4. 构建 Lettuce 客户端配置LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig) // 设置连接池.clientOptions(clientOptions) // 设置客户端选项.commandTimeout(Duration.ofSeconds(5)) // 命令超时时间.shutdownTimeout(Duration.ofMillis(100)) // 关闭超时时间.build();// 5. 创建 LettuceConnectionFactoryreturn new LettuceConnectionFactory(redisConfig, clientConfig);}/** * SpringBoot自动配置RedisTemplate/StringRedisTemplate,也可以做自定义增强* 如果使用多连接池,需要指定 RedisTemplate 使用哪个 ConnectionFactory* 1.修改客户端类型* 2.指定序列化策略* 3.配置redis高级功能(如发布订阅)*/@Beanpublic StringRedisTemplate stringRedisTemplate() {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(lettuceConnectionFactory()); // 明确指定 Lettuce//template.setConnectionFactory(jedisConnectionFactory()); // 也可以指定 Jedisreturn template;}@Beanpublic RedisTemplate<String, Object> redisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(lettuceConnectionFactory()); // 明确指定 Lettuce//template.setConnectionFactory(jedisConnectionFactory()); // 也可以指定 Jedis// 可选:配置序列化器template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}
}
- 哨兵模式配置
@Configuration
public class RedisClusterConfig {/*** 配置哨兵模式(支持哨兵 + 读写分离)* 哨兵监听主节点,从节点由主节点发现,因此只配置哨兵即可*/@Beanpublic RedisConnectionFactory redisConnectionFactory() {// 配置 Redis Sentinel(哨兵)RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration();sentinelConfig.setMaster("mymaster"); // 设置主节点名称sentinelConfig.setPassword("123456"); // 设置主节点密码sentinelConfig.sentinel("192.168.1.100", "26379"); //哨兵1sentinelConfig.sentinel("192.168.1.101", "26379"); //哨兵2// 配置 Lettuce 连接池GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);// 构建 Lettuce 客户端配置LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig) // 设置连接池.readFrom(ReadFrom.REPLICA_PREFERRED) // 优先从从节点读取.commandTimeout(Duration.ofSeconds(5)) // 命令超时时间.shutdownTimeout(Duration.ofMillis(100)) // 关闭超时时间.build()return new LettuceConnectionFactory(sentinelConfig, clientConfig);}/*** 配置 StringRedisTemplate*/@Beanpublic StringRedisTemplate stringRedisTemplate() {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory());template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}
}
- 分片模式配置
@Configuration
public class RedisClusterSentinelConfig {/*** 配置集群模式(支持哨兵 + 读写分离)* 集群模式无需配置哨兵,redis会自动在主从节点间故障转移,从节点由主节点发现,只配置主节点即可*/@Beanpublic RedisConnectionFactory redisConnectionFactory() {// 集群配置List<String> masterNodes = Arrays.asList("192.168.1.100:6379", "192.168.1.101:6379", "192.168.1.102:6379");RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(masterNodes);clusterConfig.setPassword("123456");// 客户端配置LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离.build();return new LettuceConnectionFactory(clusterConfig, clientConfig);}@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}
}
RedisTemplate
-
SpringDataRedis
提供了RedisTemplate
工具类,其中封装并统一了对redis的各种操作API类 说明 redisTemplate.opsForValue()
操作string类型 redisTemplate.opsForHash()
操作hash类型 redisTemplate.opsForList()
操作List类型 redisTemplate.opsForSet()
操作set类型 redisTemplate.opsForZSet()
操作Zset类型 redisTemplate
通用操作redis命令
public class SpringDataRedisTest {@Resourceprivate RedisTemplate<String,Object> redisTemplate;@Testvoid testRedisString(){try {//写入一条数据,redisTemplate入参是对象会自动序列化为字节数组redisTemplate.opsForValue().set("user",new User());//获取一条数据User user =(User) redisTemplate.opsForValue().get("user");} catch (Exception e) {System.out.println("连接失败");throw new RuntimeException(e);}}
}
StringRedisTemplate
-
StringRedisTemplate
和RedisTemplate
:StringRedisTemplate
默认使用JDK序列化,Redis保存的是不可读的字节数组,手动修改序列化器又消耗了空间性能StringRedisTemplate
使用StringRedisSerializer
序列化,Redis保存的是可读文本,可以存入Json字符串
-
通用API(操作key)
stringRedisTemplate.delete(String key)
:删除键stringRedisTemplate.delete(Collection<String> keys)
:批量删除键stringRedisTemplate.hasKey(String key)
:键是否存在stringRedisTemplate.expire(String key, long timeout, TimeUnit unit)
:设置键的过期时间stringRedisTemplate.getExpire(String key, TimeUnit unit)
:获取键的剩余生存时间(TTL)stringRedisTemplate.rename(String oldKey, String newKey)
:重命名键stringRedisTemplate.renameIfAbsent(String oldKey, String newKey)
:键不存在时才重命名
-
String-API
StringRedisTemplate.opsForValue().set(K key, V value, long timeout, TimeUnit unit)
:新增/修改值StringRedisTemplate.opsForValue().get(K key)
:获取key下的值StringRedisTemplate.opsForValue().increment(K key, D delta)
:key下的值自增deltaStringRedisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit)
:setnx,值不存在时才新增StringRedisTemplate.opsForValue().multiSet(Map<String, String> map)
:批量set多个值StringRedisTemplate.opsForValue().multiGet(Collection keys)
:获取多个值,以List
类型接收
-
List-API
StringRedisTemplate.opsForList().leftPush/rightPush(K key, V value)
:左插/右插StringRedisTemplate.opsForList().leftPushIfPresent/rightPushIfPresent(K key, V value)
:如果存在则添加元素StringRedisTemplate.opsForList().leftPop/rightPop(K key)
:移除左/右边元素StringRedisTemplate.opsForList().leftPop/rightPop(K key, long timeout, TimeUnit unit)
:移除失败则返回null
StringRedisTemplate.opsForList().range(K key, long start, long end)
:获取指定区间的值,以List
接收StringRedisTemplate.opsForList().size(K key)
:获取key下的List
长度StringRedisTemplate.opsForList().index(K key, long index)
:获取游标位置的元素StringRedisTemplate.opsForLis().set(K key, long index, V value)
:指定位置插入元素
-
Hash-API
StringRedisTemplate.opsForHash().put(H var1, HK var2, HV var3)
:插入/修改hashStringRedisTemplate.opsForHash().putIfAbsent(H key, HK var2, HV var3)
:如果存在hash则插入StringRedisTemplate.opsForHash().putAll(H key, Map<? extends HK, ? extends HV> map)
:批量插入/修改StringRedisTemplate.opsForHash().get(H var1, Object var2)
:获取key下键对应的值,返回Object
,需要再手动强转StringRedisTemplate.opsForHash().multiGet(H key, Collection vals)
:获取key下的多个键vals对应的值,以List
接收StringRedisTemplate.opsForHash().entries(H key)
:获取key下Hash的所有键值对,以Map
接收StringRedisTemplate.opsForHash().keys(H key)
:获取key下Hash的所有键,以Set
接收StringRedisTemplate.opsForHash().values(H key)
:获取key下Hash的所有值,以List
接收StringRedisTemplate.opsForHash().delete(H key, Object var1 …)
: 根据key下的Hash中的键var1…,删除Hash的键值对StringRedisTemplate.opsForHash().increment(H key, HK var2, long long1)
:key下的键var2对应的值自增long1StringRedisTemplate.opsForHash().size(K key)
:获取key下的Hash长度
-
Set-API
StringRedisTemplate.opsForSet().add(K var1, V... var2)
:添加一个或多个元素StringRedisTemplate.opsForSet().members(K key)
:获取key下的Set所有元素StringRedisTemplate.opsForSet().size(K key)
:获取key下的Set长度StringRedisTemplate.opsForSet().remove(K var1, Object... var2)
:移除Set成员元素StringRedisTemplate.opsForSet().isMember(K var1, Object var2)
:判断是否包含元素StringRedisTemplate.opsForSet().intersect(K var1, K var2)
:计算多个集合的交集,并返回结果集合StringRedisTemplate.opsForSet().union(K var1, K var2)
:计算多个集合的并集,并返回结果集合StringRedisTemplate.opsForSet().difference(K var1, K var2)
:计算两个集合的差集,并返回结果集合
-
Zset-API
StringRedisTemplate.opsForZSet().add(K var1, V var2, double var3)
:添加一个成员,同时指定该成员的分数StringRedisTemplate.opsForZSet().range(K var1, long var2, long var4)
:按索引范围查询,返回由低到高排序的集合StringRedisTemplate.opsForZSet().reverseRange(K var1, long var2, long var4)
:按索引范围查询,由低到高stringRedisTemplate.opsForZSet().rangeByScore("key", minScore, maxScore)
:按score范围查询,包含边界stringRedisTemplate.opsForZSet().rangeByScore("key", minScore, maxScore, 0, -1)
:按score范围查询,不含边界StringRedisTemplate.opsForZSet().zCard(K var1)
:获取有序集合中的成员数StringRedisTemplate.opsForZSet().remove(K var1, Object... var2)
:移除ZSet成员StringRedisTemplate.opsForZSet().incrementScore(K var1, V var2, double var3)
:指定成员的分数增加指定步长StringRedisTemplate.opsForZSet().count(K var1, double var2, double var4)
:返回score范围内的成员数量StringRedisTemplate.opsForZSet().rank(K var1, Object var2)
:获取指定成员在有序集合中的排名,以long
接收
@SpringBootTest
public class TestRedis {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Test// 测试redis连接public void testConnect() {String result = stringRedisTemplate.execute(RedisConnection::ping);Assert.assertEquals(result, "PONG"); //ping-pong检测}@Testpublic void test(){try {ObjectMapper objectMapper = new ObjectMapper();//写入一条数据String userDTO = objectMapper.writeValueAsString(new UserDTO());stringRedisTemplate.opsForValue().set("user",userDTO);//获取一条数据String userStr = stringRedisTemplate.opsForValue().get("user");UserDTO user = objectMapper.readValue(userStr, UserDTO.class);} catch (Exception e) {System.out.println("连接失败");throw new RuntimeException(e);}}
}
Redisson
- redisson包含了Spring Data Redis依赖,但排除了jedis和lettuce客户端,如果要使用还是要加入Spring Data Redis依赖
- redisson内置了连接池,无需额外配置数据源
<!-- Redisson 官方 Starter(推荐) -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.4</version> <!-- 使用最新版本 -->
</dependency>
@Configuration
public class RedissonConfig {@Bean("singleRedissonClient") //单机模式public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456").setDatabase(0);return Redisson.create(config);}@Bean("SentinelRedissonClient") //哨兵模式,只需要配置哨兵public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useSentinelServers().addSentinelAddress("redis://192.168.1.1:26379", "redis://192.168.1.2:26379") //两个哨兵.setMasterName("master") // master节点名称,节点地址在哨兵中已配置好.setPassword("yourpassword"); // master节点密码.setSentinelPassword("sentinel-password") // Sentinel 节点的密码(可选).setDatabase(0);return Redisson.create(config);}@Bean("ClusterRedissonClient") //集群模式,只需要配置主节点public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec()) //以文本形式保存,否则redisson默认保存字节数组.useClusterServers() .addNodeAddress("redis://192.168.1.1:6379", "redis://192.168.1.2:6379") //两个主节点.setPassword("yourpassword") //Redis 集群模式下,所有主节点和从节点的密码必须完全一致.setDatabase(0);return Redisson.create(config);}
}
Redis应用
分布式ID
-
目的:分布式环境下,防止ID重复
-
原理:推荐雪花算法思想,可以将本地序列号改为redis自增生成序列号
- 第一个部分:只有一个符号位且永远是0,表示id永远是正数,如果使用无符号
long
,可以忽略这一位 - 第二部分:31位,表示当前时间的时间戳
- 第三部分:32位,使用redis的incrby命令递增生成序列号
- 其他:如果业务有相关需求,可以再将相关数据加入其中(例如可以再拼接机器信息)
- 第一个部分:只有一个符号位且永远是0,表示id永远是正数,如果使用无符号
-
替代方案
- UUID:可能重复,但概率极低
- 雪花算法(推荐):时间严格递增、工作节点 ID 唯一,因此几乎不会重复,早期存在时钟回拨的问题,但已修复
- Leaf(美团开源方案):几乎不会重复
-
举例:分布式ID生成器
@Component
public class RedisIdUtil {// 时间戳位数private static final int TIMESTAMP_BITS = 31;// 序列号位数private static final int COUNT_BITS = 32;@Autowiredprivate StringRedisTemplate stringRedisTemplate;// 入参是业务前缀,用于区分不同的业务,key格式为 service:日期:id(防止多个业务共用一个ID生成器)public Long getId(String service){LocalDateTime now = LocalDateTime.now();// 1.生成时间戳long timestamp = now.toInstant(ZoneOffset.UTC).toEpochMilli();// 2.生成序列号,不同的业务有不同的id,创建一个键专门存储订单数量String day = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); //精确到天,每天更新id,如果键不存在,则redis会自动创建这个key并初始值取0long count = stringRedisTemplate.opsForValue().increment(servicePrefix + ":" + day +":id"); // 3.拼接return timestamp << COUNT_BITS | count;}
}
分布式会话
-
目的:防止session失效
-
原理
- 用户首次登录时,后台生成唯一的
sessionID
作为key,用户信息作为value保存在redis中,返回sessionID
- 用户再发出请求时
authorization
携带sessionID
,后台查询redis中key为sessionID
下的用户信息,如果无效就返回重新登录,如果验证用户信息有效,还需要根据业务场景延长有效期 - 验证通过后,可以将用户信息保存在ThreadLocal中,便于后续复用用户信息
- 用户首次登录时,后台生成唯一的
-
替代方案
- JWT
- Token(OAuth2)
-
举例:分布式会话中,实现同一用户指定时间内粘性登录
- 要求:有的请求需要验证登录,用的不需要;但如果已经登录,不需要验证登录的请求也会重置登录有效期
- 为更好维护,可以使用两个拦截器:第一个拦截所有请求重置有效期,第二个拦截需要验证身份的请求验证登录
//临时保存token的ThreadLocal类
public class ThreadLocalDto {public static final ThreadLocal<UserDTO> threadLocal = new ThreadLocal<UserDTO>();}
//重置有效期拦截器,如果已经登录就重置有效期再放行,其他情况直接放行
@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler) throws Exception {//1.获取tokenString token = req.getHeader("authorization");//2.如果不存在说明没有登录过,给登录拦截器处理if (null == token || token.isEmpty()){return true;}//3.token存在,说明用户登录过,判断用户信息是否过期String userDTOStr = stringRedisTemplate.opsForValue().get("session:"+token);//4.如果过期就放行,让登录验证处理if (userDTOStr == null || userDTOStr.isEmpty()){return true;}//5.如果没有过期,就保存用户信息到ThreadLocal中,并重置有效期60分钟UserDTO userDTO = JSON.parseObject(userStr, UserDTO.class)ThreadLocalDto.threadLocal.set(userDTO);stringRedisTemplate.expire("login:token:"+token, 60, TimeUnit.MINUTES);return true;}}//需要登录的请求再走登录验证拦截器
@Component
public class LoginInterceptor implements HandlerInterceptor {//请求执行前置拦截,实现校验登录功能@Overridepublic boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler) throws Exception {//1.判断是否登录,即判断ThreadLocal是否有值if (ThreadLocalDto.threadLocal.get() == null){//2.如果没有值,说明用户没有登录或者登录过期,拦截让用户重新登录res.setStatus(401);return false;}//3.如果有值,说明用户已经登录了,于是放行return true;}//请求执行完毕后置处理,移除用户信息,避免内存泄露@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {ThreadLocalDto.threadLocal.remove();}
}
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {//RefreshTokenInterceptor先执行,拦截所有请求,如果已登录检查有效期registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);//LoginInterceptor后执行,排除不用登录验证的请求registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code", //验证码请求不用拦截"/user/login", //登录请求不用拦截).order(1);}
}
验证码登录
-
目的:验证码验证身份
-
原理
- 验证码发送后,将验证码存在redis中设置有效期,有效期内验证成功即可
- 使用redis记录用户发送验证码次数,如果次数过多就拒绝请求,前端也可以先实现请求间隔限制,做第一层防护
-
举例:实现验证码登录功能
- 验证码发送后指定时间内有效
- 验证码不能频繁发送
- 用户数量一般不会达到海量级别,Session登录的token直接使用UUID即可
@Service
public class UserService implements IUserService{@Autowiredprivate UserMapper userMapper;@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** 生成指定位数的随机数字验证码*/private String generateCode(int length) {StringBuilder code = new StringBuilder();Random random = new Random();for (int i = 0; i < length; i++) {code.append(random.nextInt(10)); // 0-9的随机数}return code.toString();}/*** 发送验证码功能*/@Overridepublic Result sendCode(String phone) {// 1.校验手机号,使用正则表达式来匹配if (!RegexUtils.isPhoneInvalid(phone)){return Result.fail("无效的手机格式");}// 2.检查是否请求次数过多,设置key统一为login:code:count:phoneString count = stringRedisTemplate.opsForValue().get("login:code:count:" + phone );if (null != count && Integer.parseInt(count) >= 3) {return Result.fail("超过今日的发送次数");}// 3.开始验证码流程,设置请求次数计数器if (null == count){// 验证次数每天刷新,每天限制三条stringRedisTemplate.opsForValue().set("login:code:count" + phone, "1", 24, TimeUnit.HOURS); }else {stringRedisTemplate.opsForValue().increment("login:code:count" + phone, 1);}// 4.生成并发送一个验证码,保存验证码到redis,有效期五分钟String code = generateCode(6);stringRedisTemplate.opsForValue().set("login:code:" +phone,code, 2, TimeUnit.MINUTES);// 5.发送验证码// TODOreturn Result.ok();}/*** 登录功能*/@Overridepublic Result login(UserDTO userDTO) {String phone = userDTO.getPhone();// 1.校验手机号if (!RegexUtils.isPhoneInvalid(phone)){return Result.fail("无效的手机格式");}// 2.从redis取出校验验证码验证String code = stringRedisTemplate.opsForValue().get("login:code:" + phone);if (null == code || !userDTO.getCode().equals(code)){return Result.fail("验证失败或验证码已过期");}UserPO userPO = userMapper.getUserByPhone(phone);// 3.如果用户不存在,则注册用户if (null == userPO){userPO = BeanUtil.copyProperties(userDTO, userPO.class);userMapper.insert(userPO);}// 4.生成分布式会话token保存在redis中String token = UUID.randomUUID().toString(true);String jsonUserDto = JSON.toJSONString(userDTO);stringRedisTemplate.opsForValue().set("login:token:"+token, jsonUserDto, 60L, TimeUnit.MINUTES);// 7.返回给用户token,下次请求带此token,实现粘性登录return Result.ok(token);}
}
分布式锁
- 目的:分布式环境下,需要支持多节点共享锁资源
setnx分布式锁
-
原理
- 使用redis模拟锁的生成,线程访问资源前先
setnx
实现持锁过程 - 为了防止死锁,分布式锁可以设置有效期,但这样要注明线程标识,防止锁自动失效导致其他线程误删
- 使用redis模拟锁的生成,线程访问资源前先
-
替代方案
- 复杂场景可以使用ZooKeeper或者Redisson实现分布式锁
- 可以使用原子性操作,例如插入脚本、使用redisson分布式集合、无锁算法等
- 简单场景下可以直接使用数据库的事务功能保证原子性
-
举例:使用分布式锁解决多端登录问题
- 业务要求,用户可以多端同时登录,但指定业务同一时间只能在一台设备上操作(例如抢购、支付等功能)
- 注意防止锁误删:请求A执行时间过长导致锁自动释放,请求B持锁时,请求A完成导致释放请求B的锁
@RestController
@RequestMapping("/voucher-order")
public class OrderController {@Autowiredprivate IOrderService orderService;@Autowiredprivate StringRedisTemplate stringRedisTemplate;//抢购功能同一时间只能在一态设备上操作@PostMapping("order/{id}")public Result seckillOrder(@PathVariable("id") Long goodId, HttpServletRequest request) {// 获取用户信息,如果未登录会提前被拦截Long userId = ThreadLocalDto.threadLocal.get().getId();// 本线程标识String threadId = UUID.randomUUID().toString();// 拿分布式锁,value为线程标识,防止锁自动释放导致锁误删Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent("lock:order:" + userId, threadId, 5, TimeUnit.MINUTES);if (!ifAbsent) { // 未拿到锁,说明其他设备正在操作本业务return Result.fail("其他设备正在处理,请重试");}try{ // 拿到锁,开始执行业务return orderService.panicBuy(goodId);}finally{// 释放锁前先检查是否是自己的锁,这两步也可以使用LUA脚本严格保证原子性String redisThreadId = stringRedisTemplate.opsForValue().get("lock:order:" + userId);if(redisThreadId.equals(threadId)){stringRedisTemplate.delete("lock:order:"+userId);}}}
}
LUA脚本
-
目的:setnx分布式锁无法满足一些业务场景时,可以使用LUA脚本操作redis
-
原理
- LUA脚本具有原子性
- 如果业务逻辑较复杂,无法使用LUA脚本一次性完成,可以使用Zookeeper或者Redisson的分布式锁功能
-
StringRedisTemplate
调用LUA脚本stringRedisTemplate.execute(RedisScript<T> var1, List<K> var2, Object... var3); // List<String> keys: 代表 Redis 键(Key),是 Lua 脚本中通过 KEYS[1], KEYS[2]... 访问的变量 // Object... args: 代表额外参数,是 Lua 脚本中通过 ARGV[1], ARGV[2]... 访问的变量
-
LUA脚本
- 行结束符是换行(回车)
- 局部变量以
local
修饰,全局变量不加修饰符 - 字符串以单引号或者双引号表示,用
..
拼接字符串 - LUA脚本默认以字符串接收参数,
tonumber(str)
转换为数值,tostring(num)
转换为字符串,str == "true"
表示布尔值
-- LUA接收调用参数 local key = KEYS[1] -- 对应 Java 代码中的 keys.get(0) local value = ARGV[1] -- 对应 Java 代码中的 args[0] local bool = (key == "true") -- 布尔值没有专门的函数,可以使用逻辑表达式转换 local num = tonumber(value) -- 数字类型需要转换 -- LUA调用redis命令 redis.call("命令名称","key","value",...,"其他参数") -- redis.pcall作用和redis.call相同,单可以捕获异常信息 local ifSuccess, err = redis.pcall("SET", "mykey", "Hello") -- ifSuccess布尔值表示执行结果,如果有异常err会捕获信息 if not success thenprint("Error:", err) elseprint("Success:", success) end
-
举例:高并发抢购商品,使用LUA脚本防止超卖
- 使用
setnx
分布式锁不合适,如果用户抢购时未成功持锁就直接失败,会影响用户体验,即系统显得不可用 - 使用LUA脚本扣减库存保证原子性
- 使用
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {// 1.查询抢购相关信息/*商品信息以Hash保存*key为good:goodId*value为{beginTime:begin,end:endTime,stoke:stoke}*/String begin = (String) stringRedisTemplate.opsForHash().get("good:" + goodId, "beginTime");String end = (String) stringRedisTemplate.opsForHash().get("good:" + goodId, "endTime");Long userId = ThreadLocalDto.threadLocal.get().getUserId();// 2.不在抢购期内直接失败if (LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 3.调用lua脚本开始抢购,尝试扣减库存Long luaResult = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.singletonList(goodId));if (luaResult == -1) {// 没有库存,抢购失败return Result.fail("抢购失败");}// 4.库存扣减成功,创建临时订单存入redis缓存,这一步也可以异步完成提高效率/*使用Set保存订单ID便于查询用户名下的订单,使用String保存订单具体内容便于删除*key为order:user:userId,value为订单ID的集合*key为order:orderId,value为订单内容*/Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idStringRedisTemplate.opsForSet().add("order:user:"+userId, String.valueOf(orderId));OrderDto orderDto = OrderDto.builder() //OrderDto使用Lombok或手动实现了Builder 模式.orderId(orderId).goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();stringRedisTemplate.opsForValue().set("order:"+orderId, JSON.toJSONString(orderDto));// 5.返回订单idreturn Result.ok(orderId);}
}
local goodId = KEYS[1] -- 抢购商品id
local stock = redis.call('hget', 'good:' .. goodId, 'stock') -- 扣减库存
if(tonumber(stock) <= 0) then --库存不足,下单失败,返回-1return -1
end
redis.call('hincrby', 'good:' .. goodId, 'stock',- 1) -- 库存充足,扣减库存
return 0 -- 扣减成功,返回0
Redlock算法
-
目的:传统setnx分布式锁在redis集群环境中可能因为主节点宕机,从节点未来得及同步,导致分布式锁失效,可使用Redlock算法
-
原理
- 多数派机制:对每个节点都setnx加锁,如果成功节点数 ≥ N/2 + 1,则认为加锁成功
- 独立时钟校验:每个节点setnx加锁时,记录各自的过期时间,保证所有节点的锁ttl同时失效
- 释放锁:向所有节点发送 Lua 脚本解锁(仅删除自己持有的锁)
-
其他方案
- RedLock适用于redis集群架构,建议使用Redisson的分布式锁
- 可以使用zookeeper加分布式锁
@Configuration
// 配置redis集群
public class RedisClusterConfig {// 1. 定义多个 Redis 连接工厂@Beanpublic RedisConnectionFactory redisConnectionFactory1() {return new LettuceConnectionFactory("127.0.0.1", 6379);}@Beanpublic RedisConnectionFactory redisConnectionFactory2() {return new LettuceConnectionFactory("127.0.0.1", 6380);}@Beanpublic RedisConnectionFactory redisConnectionFactory3() {return new LettuceConnectionFactory("127.0.0.1", 6381);}// 2. 定义多个 StringRedisTemplate,并交给 Spring 管理@Beanpublic StringRedisTemplate redisTemplate1(RedisConnectionFactory redisConnectionFactory1) {return new StringRedisTemplate(redisConnectionFactory1);}@Beanpublic StringRedisTemplate redisTemplate2(RedisConnectionFactory redisConnectionFactory2) {return new StringRedisTemplate(redisConnectionFactory2);}@Beanpublic StringRedisTemplate redisTemplate3(RedisConnectionFactory redisConnectionFactory3) {return new StringRedisTemplate(redisConnectionFactory3);}// 3. 注入 List<StringRedisTemplate>@Beanpublic List<StringRedisTemplate> redisTemplates() {return List.of(redisTemplate1(), redisTemplate2(), redisTemplate3());}
}
@Component
public class RedLockUtil {private final List<StringRedisTemplate> redisTemplates; // 多个 Redis 节点private final int quorum; // 法定数量(大多数)@Autowired // 可以省略(Spring 4.3+ 自动注入构造函数)public RedLockUtil(List<StringRedisTemplate> redisTemplates) {this.redisTemplates = redisTemplates;this.quorum = redisTemplates.size() / 2 + 1; // 在构造时计算}public boolean tryLock(String lockKey, long expireTime, long retryDelay, int maxRetries) {String lockValue = UUID.randomUUID().toString(); // 唯一标识,防止误删其他客户端的锁for (int i = 0; i < maxRetries; i++) {int successCount = 0;long startTime = System.currentTimeMillis();// 遍历所有 Redis 节点,尝试获取锁for (StringRedisTemplate redisTemplate : redisTemplates) {Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {successCount++;}}// 计算获取锁的总耗时long elapsed = System.currentTimeMillis() - startTime;// 如果大多数节点获取成功,并且总耗时小于锁过期时间,则认为加锁成功if (successCount >= quorum && elapsed < expireTime) {return true;}// 否则,释放所有节点上的锁(避免部分节点获取成功但最终失败)unlockInner(lockKey, lockValue);// 等待一段时间后重试(避免活锁)try {Thread.sleep(retryDelay + (long) (Math.random() * retryDelay));} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}return false;}public void unlock(String lockKey, String lockValue) {for (StringRedisTemplate redisTemplate. : redisTemplates) {// 建议下面改用 Lua 脚本确保原子性if (lockValue.equals.(redisTemplate.opsForValue().get(lockKey))){redisTemplate.delete(lockKey);}}}
}
任务队列
-
目的:实现消息队列功能
-
替代方案
- 使用Redisson的
RTopic
功能 - 使用MQ
- 使用Redisson的
消息队列
-
特点
- 插入或获取操作立即返回,如果获取失败返回nil
- 要求redis5以上,支持消息持久化、支持ACK 机制、支持消息回溯、支持消费分组
-
Redis 5.0+参考kafka思想,引入了专门为消息队列设计的Stream数据结构
- Stream:类似List类型,value是链表,用于存储消息,每个消息节点 = 消息ID(可自动生成)+ 消息内容(键值对表示)
- Consumer Group:消费者组,同步存储对应key下Stream中的消息,主要有以下功能
- 消息分发:将 PEL 中的未确认消息轮流分配给消费者,确保消息不重复分发,保证消费者组内的负载均衡
- 进度管理:消费者组记录最后处理成功的消息 ID,确保新加入的消费者从正确位置开始消费
- 消息重试:费者读取消息后,若未确认,消息会保留在 PEL 中,超时可以重新分配
- 消息回溯:支持从任意位置重新消费
- Pending Entries List (PEL机制):每个消费者组维护一个 PEL,并记录未确认消息
-
原理:生产者—指定Strem组—指定消费组—消费者
-
发送消息:生产者调用
XADD
命令向 Stream 添加消息,Stream 中每个节点存储一条消息XADD order_stream * product "iPhone" amount 2 customer "Alice" # order_stream:Stream 的键名 # *:让 Redis 自动生成消息 ID # product "iPhone" amount 2 customer "Alice" :消息内容(键值对形式)
-
创建消费者组:消费者组首次消费前,需调用
XGROUP CREATE
创建组XGROUP CREATE order_stream order_group $ MKSTREAM # order_stream:Stream 键名 # order_group:消费者组名 # $:从最新消息开始消费,或 0 从最早开始 # MKSTREAM:如果 Stream 不存在则自动创建
-
获取消息:消费者的消费操作
XREADGROUP
会自动注册到消费者组中,XREAD
是消费者独立读取(不建议,可能会重复消费)XREADGROUP GROUP order_group consumer1 COUNT 1 STREAMS order_stream > # order_group:消费者组名 # consumer1:消费者名称(需唯一,如基于进程 ID 或机器名) # COUNT 1:每次读取 1 条消息 # >:仅读取未分配给其他消费者的新消息
-
消息确认:消费者消费完消息后,调用
XACK
确认消息,PEL 会自动移除该消息,表示消息已成功处理XACK order_stream order_group 1630000000000-0 # order_stream:Stream 键名 # order_group:消费者组名 # 1630000000000-0:消息 ID
-
消息重试:消息超时未确认,可以调用
XCLAIM
将消息重新分配给其他消费者XCLAIM order_stream order_group new_consumer 300000 1630000000000-0 # order_stream:Stream 键名 # order_group:消费者组名 # new_consumer:新的消费者名称 # 300000:消息最小未确认时间(毫秒,即 5 分钟)
-
消息回溯:
XREAD
或XREADGROUP
指定起始 IDXREAD COUNT 10 STREAMS order_stream 1630000000000-0 # 从指定消息 ID 开始读取历史消息 # COUNT 10:限制最多返回 10 条消息
-
-
RedisTemplate
对应APIStringRedisTemplate.opsForStream().add(String key, Map<String, String> value)
:提交消息到指定streamStringRedisTemplate.opsForStream().createGroup(String streamkey, String groupKey)
:创建消费者组StringRedisTemplate.opsForStream().read(Consumer cons, StreamReadOptions Opt, StreamOffset streams)
:获取消息StringRedisTemplate.opsForStream().acknowledge(String streamkey, String groupKey, String... recordIds)
:确认ack
@Service
public class OrderService{ //生产者@Autowiredprivate StringRedisTemplate redisTemplate;public void addToStream(OrderDto orderDto) {String orderTask = JSON.toJSONString(orderDto);Map<String, String> orderMap = JSON.parseObject(orderTask, new TypeReference<Map<String,String>>() {});;// 添加到 Stream(如果Stream不存在会自动创建),自动生成消息IDRecordId recordId = stringRedisTemplate.opsForStream().add("order_stream", orderMap);}
}
@Service
public class OrderQueueConsumer{private final StringRedisTemplate stringRedisTemplate;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate stringRedisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.stringRedisTemplate = stringRedisTemplate;this.consumerPool = consumerPool;initializeConsumerGroup();consumerPool.execute(this::readMessagesFromStream);}private void initializeConsumerGroup() {// 检查消费者组是否已存在(Stream会自动创建,但消费者组不会自动创建)List<StreamGroupInfo> groups = stringRedisTemplate.opsForStream().listGroups("order_stream");boolean groupExists = groups.stream().anyMatch(g -> GROUP_NAME.equals("order_group"));if (!groupExists) {stringRedisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);} }/*** 正常消费消息(Pending 列表)*/public void readMessagesFromStream() { //消费消息// 在调用 read() 前,始终通过 createGroup() 确保消费者组存在stringRedisTemplate.opsForStream().createGroup("order_stream", "order_group");while (true){// 从 ID "0" 开始读取,最多 10 条消息List<MapRecord<String, Object, Object>> messages = stringRedisTemplate.opsForStream().read(Consumer.from("order_group", "consumer1"), // 消费者组StreamReadOptions.empty().count(10).block(Duration.ofSeconds(0)), // 非阻塞,-1表示一直阻塞StreamOffset.create("order_stream", ReadOffset.lastConsumed()); // 从上次消费的位置继续);if (messages == null || messages.isEmpty()) {// 短暂休眠,避免频繁轮询Thread.sleep(1000);}messages.forEach(message -> {// 模拟业务处理(可能成功或失败)if(simulateBusinessProcessing(message)){// 处理成功,确认消息stringRedisTemplate.opsForStream().acknowledge("order_stream", "order_group", message.getId());}else {// 处理失败,不 ACK(消息会留在 Pending 列表,下次重试)System.out.println("Message " + messageId + " processing failed. Will retry later.");throw new RuntimeException("Processing failed");}});}}/*** 模拟业务处理(随机成功或失败)*/private boolean simulateBusinessProcessing(Map<Object, Object> values) {// 模拟随机失败(实际业务中替换为真实逻辑)return Math.random() > 0.3; // 70% 成功率}/*** 处理未确认的消息(Pending 列表)*/@Asyncprivate void readPendingMessagesFromStream() {List<MapRecord<String, Object, Object>> readPendingMessages = stringRedisTemplate.opsForStream().read(Consumer.from(GROUP_NAME, CONSUMER_NAME),StreamReadOptions.empty().count(10), // 每次读取10条StreamOffset.create(STREAM_KEY, ReadOffset.from(">")) // ">" 表示 Pending 列表);if (pendingMessages == null || pendingMessages.isEmpty()) {// 短暂休眠,避免频繁轮询Thread.sleep(1000);}for (MapRecord<String, Object, Object> message : pendingMessages) {... // 重试模式逻辑内容}}
}
阻塞队列
-
特点:生产者消费者模型,利用阻塞队列避免忙等待
-
原理:
List
类型的BLPOP/BRPOP
命令RPUSH key value [value ...]
:从右侧插入数据(生产者使用,左查对应右插,保证任务时间上的相对顺序)BLPOP key [key ...] timeout
:从左侧阻塞地弹出元素,若列表为空则阻塞直至有数据或超时(返回nil)- 为保证顺序,右插—左取、左插—右取
-
RedisTemplate
中没有BLPOP命令对应的方法,可以采用以下两种方法RedisTemplate.execute()
调用原生命令(推荐使用)- 直接使用 Lettuce/Jedis(不建议,因为会降低与 Spring 生态的集成度)
-
举例:用户主动取消支付,实时异步执行取消订单
- 消费者端获取消息
- 消费者端异步自动消费,可设置阻塞时间(例如空列表时一直阻塞)
@Service
public class OrderService { // 生产者@Autowiredprivate StringRedisTemplate redisTemplate;public void cancelOrder(OrderDto orderDto) {String orderTask = JSON.toJSONString(orderDto);redisTemplate.opsForList().rightPush("my_task_queue", orderTask);}
}
@Component
public class OrderQueueConsumer { //消费者private final StringRedisTemplate redisTemplate;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate redisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redisTemplate = redisTemplate;this.consumerPool = consumerPool;consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {String task = redisTemplate.execute((RedisConnection connection) -> {List<byte[]> result = connection.bLPop(0, "order_cancel_queue".getBytes()); // 0 = 无限阻塞if (result != null && !result.isEmpty()) {return new String(result.get(1));}return null; // 理论上不会执行,因为 timeout=0});System.out.println("Processing task: " + task);}}
}
延迟队列
-
目的:当前任务执行完毕后,一段时间后自动执行指定任务
-
原理:
Zset
+轮询- 使用
Zset
构建一个延迟队列保存延迟任务,score为过期时间,这样可以使用范围查询高效获取到期任务 - 使用定时任务(例如Spring的
@Scheduled
功能)查询到期任务 - 处理到期任务,为了解耦延迟逻辑和任务执行逻辑,也可以将到期任务放入任务队列中进一步处理
- 使用
-
其他方案:希望更准时执行任务
- 使用Redissson的
RDelayedQueue
功能 - 使用MQ的延迟消息功能
- 使用Redissson的
-
举例:订单超时自动取消
- 用户下订单后,将临时订单信息保存到redis缓存,并将此订单加入redis的延迟队列中
- 如果用户已付款,则将该订单信息保存到数据库中,删除延迟队列中的该订单信息
- 如果用户超时未付款,删除延迟队列和缓存中的该订单信息
@Service
public class TemporaryOrdersDelayed {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate OrderMapper orderMapper;/** 添加延迟任务* @param taskId 任务ID* @param delaySeconds 延迟时长(秒)* @param taskData 任务内容*/public void addDelayedTask(Long orderId, Long userId, long delaySeconds) {long executeTime = Instant.now().getEpochSecond() + delaySeconds;//加入延迟队列/*延迟队列使用ZSet保存,便于范围查询*key为order:delayed_queue*value为userId:oderId*/stringRedisTemplate.opsForZSet().add("order:delayed_queue", userId+":"+oderId, executeTime);}// 每秒检查一次到期任务@Scheduled(fixedRate = 1000)public void pollTasks() {long now = Instant.now().getEpochSecond();// 查询到期任务,即时间早于当前时间Set<String> orders = stringRedisTemplate.opsForZSet().rangeByScore("order:delayed_queue", 0, now);if (orders == null || orders.isEmpty()) {return; // 无任务,直接返回}// 执行到期任务,这里直接执行,逻辑较复杂时建议放入阻塞队列中进一步处理for (String order : orders) {String[] orderArray = order.trim().split(":");Long userId = Long.parseLong(orderArray[0]);Long orderId = Long.parseLong(orderArray[1]);// 查询数据库,判断用户是否付款OrderPo orderPo = orderMapper.getOrder(orderId);if (orderPo == null){ //未付款,取消订单,恢复库存// 如果服务部署了多个实例,这里就需要保证原子性,防止超卖stringRedisTemplate.opsForSet().remove("order:user:"+userId, orderId); stringRedisTemplate.delete("order:"+orderId);stringRedisTemplate.opsForHash().increment("good:"+orderDto.getGoodId(), "stoke", 1);}// 延迟队列移除任务stringRedisTemplate.opsForZSet().remove("order:delayed_queue", order);}}
}
发布订阅
-
目的:生产者发出消息广播,消费者实时接收消息通知
-
原理:redis发布订阅功能(Pub/Sub)
- 创建一个频道,订阅端先订阅此频道
- 发布端对此频道发布消息,订阅端会实时接收到消息
-
发布订阅功能应用场景
- 消息不持久化,不适用核心业务
- 适用于发布订阅一对一场景,否则可能会消息堆积
-
redis命令行演示
#打开一个客户端订阅频道wyh,必须先订阅才能接受消息 127.0.0.1:6379> subscribe wyh Reading messages... (press Ctrl-C to quit) #打开另一个客户端,给wyh频道发布消息hello 127.0.0.1:6379> publish wyh hello #第一个客户端可以实时接收消息 127.0.0.1:6379> SUBSCRIBE wyh Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "wyh" 3) (integer) 1 1) "message" 2) "wyh" 3) "hello"
-
举例:实时更新订单状态
- 创建任务订阅类:实现
MessageListener
接口即可 - 配置redis消息监听器
RedisMessageListenerContainer
:绑定订阅者和频道 - 发布者向频道发布消息:
redisTemplate.convertAndSend(channel, message)
@Service public class oderService { //发布者@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发布消息到指定频道* @param channel 频道名* @param message 消息内容*/public void publish(String channel, String message) {redisTemplate.convertAndSend(channel, message);} }
@Component public class OrderSubscriber implements MessageListener { //订阅者@Autowiredprivate StringRedisTemplate stringRedisTemplate;/** MessageListener接口已经默认实现了使用Message传输消息* 如果不实现MessageListener接口,就需要在Redis消息监听容器配置类中设置消息转换器*/@Overridepublic void onMessage(Message message, byte[] pattern) {String taskData = new String(message.getBody());String channel = new String(message.getChannel());String patternStr = pattern != null ? new String(pattern) : "null"; //pattern是订阅模式,普通订阅为null// 执行消费任务逻辑(如调用服务处理任务)OrderDto orderDto = JSON.parseObject(taskData, OrderDto.class);// 更新订单状态,key:orderID,value:状态stringRedisTemplate.opsForValue().set(orderDto.getOrderId,orderDto.getStatus);} }
@Configuration public class RedisPubSubConfig { // Redis消息监听容器配置类private final OrderSubscriber orderSubscriber; // 订阅者private final RedisConnectionFactory redisConnectionFactory;@Autowiredpublic RedisPubSubConfig(OrderSubscriber orderSubscriber, RedisConnectionFactory redisConnectionFactory) {this.orderSubscriber = orderSubscriber;this.redisConnectionFactory = redisConnectionFactory;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 订阅频道:channel:ordercontainer.addMessageListener(orderSubscriber, new ChannelTopic("channel:order"));// 可以订阅多个频道// container.addMessageListener(subscriber, new ChannelTopic("channel:news"));return container;} }
- 创建任务订阅类:实现
业务幂等
-
目的:因网络等问题前端发送重复请求时,后端只处理一次,也可以用于解决MQ重复消费问题
-
原理
- 用户发出真正的业务请求前先请求后端生成一个关联的token,保存到redis中,将token返回前端
- 请求携带此token发送给后端,后端去redis查,如果存在说明请求未处理,处理完删除redis中的token
- 如果后端未查到token,说明是重复请求,直接返回即可
@RestController
@RequestMapping("/repead-api")
public class TokenController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate UserService userService;// 前端发送请求前先申请一个唯一token@GetMapping("/generate-token")public Result generateToken() {// 生成唯一Token (这里使用UUID)String token = UUID.randomUUID().toString();// 存入Redis,设置过期时间stringRedisTemplate.opsForValue().set("req_token:" + token, "1", // 值可以是任意内容,我们只关心key是否存在60, // 60秒过期TimeUnit.SECONDS);return Result.ok(token);}// 处理业务请求@PostMapping("/actual-endpoint")public Result handleRequest(HttpServletRequest request) { // 1. 验证Token是否存在String token = request.getHeader("X-Request-Token");if (Boolean.FALSE.equals(redisTemplate.hasKey("req_token:" + token))) {// Token不存在或已使用,返回重复请求提示return Result.fail("重复请求,请勿重复提交");}try {// 2. token存在,执行业务,删除TokenstringRedisTemplate.delete("req_token:" + token);return userService.api();} catch (Exception e) {// 异常处理return Result.fail("服务器错误");}}
}
缓存查询
-
目的:缓存查询的redi应用最广泛的功能,作为服务器和数据库间的缓存中间件,缓解了数据库高并发的风险
-
原理
- 用户查询数据时,先查询redis中的数据,查询成功直接返回
- 如果redis查不到数据,再查数据库中的数据,查询成功后再将数据同步到redis(根据业务设置有效期)
- 尽量防止查询到数据库层面,否则有数据库宕机风险(缓存穿透,缓存击穿)
@Service
public class ShopServiceImpl implements IShopService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate ShopMapper shopMapper;@Overridepublic Result getShopById(Long id) {// 1.从redis查询商铺String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.判断商铺是否存在if (shopJson != null && !shopJson..trim().isEmpty()){// 3.商铺存在,则返回商铺信息Shop shop = JSON.parseObject(shopJson, Shop.class);return Result.ok(shop);}// 4.redis中没有商铺,则去数据库中查Shop shop = shopMapper.selectById(id);if (null == shop){// 5.数据库中也不存在,说明没有这个商铺,查询失败return Result.fail("商铺不存在");}// 6.数据库中可以查到,于是更新缓存并设置有效期stringRedisTemplate.opsForValue().set("cache:shop:"+id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);return Result.ok(shop);}
}
缓存更新
-
目的:用户修改数据时,缓存也要同步更新
-
原理
- 用户更新操作是少量的,可以直接操作数据库
- 更新完数据库后,直接删除对应缓存即可,无需手动更新,因为下次再查询此数据时会自动同步缓存
- 不能先删除缓存,否则高并发场景下,可能还没更新完数据库,方式并发查询导致缓存又同步了,导致缓存删除失败
@Service
public class ShopServiceImpl implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;@Override@Transactionalpublic Result modify(Shop shop) {//1.更新数据库int id = shopMapper.updateById(shop);System.out.println("数据库更新完成");//2.删除缓存stringRedisTemplate.delete("cache:shop:"+shop.getId());System.out.println("已删除缓存");return Result.ok(id);}
缓存穿透
-
描述:用户查询的key在redis和数据库都不存在,会直接访问到数据库中,短时间高并发访问此数据会导致数据库宕机
-
解决方法
- 建立空值缓存:如果用户查询到数据库还没有,就将此数据缓存到redis中,相应的值设为空或者约定的错误码
- 布隆过滤:不靠谱
- 限流:当请求量超过系统处理能力或者单个用户恶意发送大量请求时,限流会拒绝非法请求
@Service
public class ShopServiceImpl implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;@Overridepublic Result getShopById(Long id) {// 1.从redis查询商铺String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.判断商铺是否存在if (shopJson != null && !shopJson..trim().isEmpty()){// 3.商铺存在,则返回商铺信息Shop shop = JSON.parseObject(shopJson, Shop.class);return Result.ok(shop);}// 4.redis中没有商铺,则去数据库中查Shop shop = shopMapper.selectById(id);if (null == shop){// 5.数据库中也不存在,说明没有这个商铺,防止恶意请求攻击,对此请求中的id进行缓存空对象stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",60L,TimeUnit.MINUTES);return Result.fail("商铺不存在");}// 6.数据库中可以查到,于是更新缓存并设置有效期stringRedisTemplate.opsForValue().set("cache:shop:"+id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);return Result.ok(shop);}
}
缓存击穿
-
描述:高并发场景下热点key过期,缓存还未来得及同步,大量请求就已经查询该数据,导致击穿到数据库
-
解决方法
- 加分布式锁:第一个线程访问数据库时,对同步缓存过程加锁,保证同一时间只有一个线程访问数据库,此时其他线程无法持锁,访问失败
- 逻辑过期:热点key始终不过其,而是根据业务设置逻辑过期时间,用户访问时逻辑过期就更新缓存,实现缓存定时更新,这样做还可以在请求未能获得锁时不会失败而是返回旧数据,增强用户体验
- 限流:当请求量超过系统处理能力或者单个用户恶意发送大量请求时,限流会拒绝非法请求
-
缓存击穿问题发生在极端情况下,普通查询不必考虑缓存击穿问题,只需考虑缓存穿透问题即可
// 缓存值包装类,包含逻辑过期时间
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class RedisShopData {private LocalDateTime expireTime;private Shop shop;
}@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate ShopMapper shopMapper;// 查询热点数据@Overridepublic Result queryHotSpot(Long id) { String threadId = String.valueOf(Thread.currentThread().getId()); // 当前线程IDtry {// 1.查询redisString shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);// 2.热点key不会过期,如果查不到,不用查询数据库,可以认为没有这个数据if (shopJson == null || shopJson.trim().isEmpty()){return Result.fail("没有这个店铺");}RedisShopData redisShopData = JSON.parseObject(shopJson, RedisShopData.class);Shop shop = redisShopData.getShop();LocalDateTime expireTime = redisShopData.getExpireTime();// 3.判断是否逻辑过期if (expireTime.isAfter(LocalDateTime.now())){return Result.ok(shop); //没有过期,直接返回}// 4.逻辑过期,准备拿锁进行缓存更新Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent("lock:shop:" + id, threadId, 10, TimeUnit.SECONDS);// 5.没有拿到锁,于是直接返回旧的数据if (!Boolean.TRUE.equals(ifAbsent)){return Result.ok(shop);}try{// 6.成功拿到锁,双重检查,防止数据已被其他线程更新String newShopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);RedisShopData newRedisShopData = JSON.parseObject(newShopJson, RedisShopData.class);if (newRedisShopData.getExpireTime().isAfter(LocalDateTime.now())) {return Result.ok(newRedisShopData.getShop()); //已经被更新,直接返回}// 7.开始查询数据库更新缓存,重置逻辑时间(一分钟)Shop newShop = shopMapper.selectById(id);newRedisShopData.setShop(newShop);newRedisShopData.setExpireTime(LocalDateTime.now().plusSeconds(60));stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSON.toJSONString(newRedisShopData));return Result.ok(newShop);} finally {// 7.释放锁,注意防止锁误删if (threadId.equals(stringRedisTemplate.opsForValue().get("lock:shop:" + id))){stringRedisTemplate.delete("lock:shop:" + id);}}} catch (Exception e) {return Result.fail("服务器错误");}}
}
缓存雪崩
-
描述:缓存中的大量key同时过期或缓存服务宕机,导致所有请求直接打到数据库
-
解决方法
- 大量key同时过期:在设置缓存时,给过期时间增加一个随机值,避免大量缓存同时失效、使用分级缓存
- redis宕机:搭建redis集群、或者使用熔断机制
- 限流策略
-
缓存雪崩问题发生比缓存击穿发生概率更低,正常场景下只需考虑缓存穿透问题即可
1.背景问题:用户购买商品下单后,后台生成了临时订单,扣减了库存,但用户迟迟不付款,导致商品一直显示售空但实际上并没有
2.解决方案:用户下临时订单后,对临时订单设立逻辑有效期(这里可以用Set保存临时订单),再开启定时任务,定时扫描临时订单,如果扫描到订单过期,则执行关闭订单动作:回滚库存,删除临时订单
位图统计
-
原理:对于布尔型数据,redis提供了Bitmap数据类型存储,占用空间极低
- 基本数据类型是用文本保存内容,如果存储布尔型数据就比较浪费空间
- Bitmap本质上也是字符串(key-value) , 但是它可以对value的位进行操作,每一位只能存储0和1
- Bitmap(key-value)的value位数会根据情况动态扩容
-
Bitmap常见命令
SETBIT key offset value # 设置偏移量为offset的位为0或1 GETBIT key offset # 获取偏移量为offset的位的值 BITCOUNT key [start end] # 统计指定字节范围内1的数量 BITPOS key bit [start end] # 查找第一个值为0或1的位的位置
-
举例:签到系统
- Key:
user:sign:{userId}:{yearMonth}
(如user:sign:1001:202507
表示用户 1001 在 2025 年 7 月的签到数据) - Value:每一位代表一天,已签到则设为1(第 0 位 = 1 号,…,第 30 位 = 31 号)
BITCOUNT user:sign:{userId}:{yearMonth}
:统计当月签到次数
- Key:
@Service
public class SignService {@Resourceprivate StringRedisTemplate redisTemplate;/*** 用户签到* @param userId 用户ID* @param date 签到日期(默认当天)* @return 是否签到成功(true=首次签到,false=已签到)*/public boolean doSign(Long userId, LocalDate date) {String key = String.format("user:sign:%d:%d%02d", userId, date.getYear(), date.getMonthValue());int offset = date.getDayOfMonth() - 1; // 计算偏移量(0=1号,1=2号,...)// 使用 BITFIELD 或 SETBIT 设置签到状态Boolean success = redisTemplate.opsForValue().setBit(key, offset, true);return Boolean.TRUE.equals(success); // 如果原值为false(未签到),返回true表示首次签到}/*** 检查用户某天是否已签到* @param userId 用户ID* @param date 查询日期* @return true=已签到,false=未签到*/public boolean checkSign(Long userId, LocalDate date) {String key = String.format("user:sign:%d:%d%02d", userId, date.getYear(), date.getMonthValue());int offset = date.getDayOfMonth() - 1;Boolean isSign = redisTemplate.opsForValue().getBit(key, offset);return Boolean.TRUE.equals(isSign);}/*** 统计用户当月签到次数* @param userId 用户ID* @param yearMonth 年月(如 "202507")* @return 签到次数*/public long getSignCount(Long userId, String yearMonth) {String key = String.format("user:sign:%d:%s", userId, yearMonth);return redisTemplate.execute(connection -> connection.bitCount(key.getBytes()));}/*** 获取用户当月连续签到天数* @param userId 用户ID* @param date 查询日期(默认当天)* @return 连续签到天数*/public long getContinuousSignCount(Long userId, LocalDate date) {String key = generateKey(userId, date);int daysInMonth = date.lengthOfMonth();int todayOffset = date.getDayOfMonth() - 1;long continuousCount = 0;for (int i = todayOffset; i >= 0; i--) {Boolean isSign = redisTemplate.opsForValue().getBit(key, i);if (Boolean.TRUE.equals(isSign)) {continuousCount++;} else {break;}}return continuousCount;}
}