当前位置: 首页 > news >正文

redis的pipline使用结合线程池优化实战

文章目录

  • 代码
  • 讲解
      • 与事务 (`MULTI/EXEC`) 的区别
      • 在你这段代码里的价值
      • 可能的坑
        • 实战建议

代码

 /*** 批量根据用户 ID 查询用户信息** @param findUsersByIdsReqDTO* @return*/@Overridepublic Response<List<FindUserByIdRspDTO>> findByIds(FindUsersByIdsReqDTO findUsersByIdsReqDTO) {// 需要查询的用户 ID 集合List<Long> userIds = findUsersByIdsReqDTO.getIds();// 构建 Redis Key 集合List<String> redisKeys = userIds.stream().map(RedisKeyConstants::buildUserInfoKey).toList();// 先从 Redis 缓存中查, multiGet 批量查询提升性能List<Object> redisValues = redisTemplate.opsForValue().multiGet(redisKeys);// 如果缓存中不为空if (CollUtil.isNotEmpty(redisValues)) {// 过滤掉为空的数据redisValues = redisValues.stream().filter(Objects::nonNull).toList();}// 返参List<FindUserByIdRspDTO> findUserByIdRspDTOS = null;// 将过滤后的缓存集合,转换为 DTO 返参实体类if (CollUtil.isNotEmpty(redisValues)) {findUserByIdRspDTOS = redisValues.stream().map(value -> JsonUtils.parseObject(String.valueOf(value), FindUserByIdRspDTO.class)).toList();}// 如果被查询的用户信息,都在 Redis 缓存中, 则直接返回if (CollUtil.size(userIds) == CollUtil.size(findUserByIdRspDTOS)) {return Response.success(findUserByIdRspDTOS);}// 还有另外两种情况:一种是缓存里没有用户信息数据,还有一种是缓存里数据不全,需要从数据库中补充// 筛选出缓存里没有的用户数据,去查数据库List<Long> userIdsNeedQuery = null;if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) {// 将 findUserInfoByIdRspDTOS 集合转 MapMap<Long, FindUserByIdRspDTO> map = findUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 筛选出需要查 DB 的用户 IDuserIdsNeedQuery = userIds.stream().filter(id -> Objects.isNull(map.get(id))).toList();} else { // 缓存中一条用户信息都没查到,则提交的用户 ID 集合都需要查数据库userIdsNeedQuery = userIds;}// 从数据库中批量查询List<UserDO> userDOS = userDOMapper.selectByIds(userIdsNeedQuery);List<FindUserByIdRspDTO> findUserByIdRspDTOS2 = null;// 若数据库查询的记录不为空if (CollUtil.isNotEmpty(userDOS)) {// DO 转 DTOfindUserByIdRspDTOS2 = userDOS.stream().map(userDO -> FindUserByIdRspDTO.builder().id(userDO.getId()).nickName(userDO.getNickname()).avatar(userDO.getAvatar()).introduction(userDO.getIntroduction()).build()).collect(Collectors.toList());// 异步线程将用户信息同步到 Redis 中List<FindUserByIdRspDTO> finalFindUserByIdRspDTOS = findUserByIdRspDTOS2;threadPoolTaskExecutor.submit(() -> {// DTO 集合转 MapMap<Long, FindUserByIdRspDTO> map = finalFindUserByIdRspDTOS.stream().collect(Collectors.toMap(FindUserByIdRspDTO::getId, p -> p));// 执行 pipeline 操作redisTemplate.executePipelined((RedisCallback<Void>) connection -> {for (UserDO userDO : userDOS) {Long userId = userDO.getId();// 用户信息缓存 Redis KeyString userInfoRedisKey = RedisKeyConstants.buildUserInfoKey(userId);// DTO 转 JSON 字符串FindUserByIdRspDTO findUserInfoByIdRspDTO = map.get(userId);String value = JsonUtils.toJsonString(findUserInfoByIdRspDTO);// 过期时间(保底1天 + 随机秒数,将缓存过期时间打散,防止同一时间大量缓存失效,导致数据库压力太大)long expireSeconds = 60*60*24 + RandomUtil.randomInt(60*60*24);redisTemplate.opsForValue().set(userInfoRedisKey, value, expireSeconds, TimeUnit.SECONDS);}return null;});});}// 合并数据if (CollUtil.isNotEmpty(findUserByIdRspDTOS) && CollUtil.isNotEmpty(findUserByIdRspDTOS2)) {findUserByIdRspDTOS.addAll(findUserByIdRspDTOS2);}return Response.success(findUserByIdRspDTOS);}

讲解

RedisTemplate.executePipelined(…) 里把多条 SET 操作放进 pipeline(管道),主要有 3 个好处——简记“省时、省包、增吞吐”。

优势说明什么时候最明显
1. 省时:减少 RTT一次 pipeline 会先把 N 条命令 打包写入 Socket,然后再统一读回 N 条结果
原来一条命令要一次 Request → Response 往返(RTT),现在 N 条命令只要 1 次 RTT
网络时延不小(跨 IDC、云 Redis)或批量写入上万条键时,节省几十到上百毫秒
2. 省包:减少 TCP/IO 开销连发 1000 条 SET不做管道 = 1000 个包;管道 = 1 个包写 + 1 个包读
少了大量系统调用与 TCP 包头,CPU 使用率和带宽占用都更低。
批量插入/更新、导入数据、刷新缓存
3. 增吞吐:服务器侧一次性处理Redis 收到整段命令流后,在单线程内部顺序执行,避免了命令‐等待‐回复的空闲间隙,同一时间窗口可以执行更多指令。高并发场景下,比单条调用 QPS 可提升数倍

👉 简化理解:把 1000 趟“银行柜台排队”换成“把 1000 张单子一次塞给柜员”。


与事务 (MULTI/EXEC) 的区别

特性PipelineMULTI…EXEC 事务
原子性❌ 不保证;命令只是顺序执行✔ 执行期内一次性、原子提交
回包时机一口气回 N 条结果EXEC 时一次性回所有结果
用途性能优化(批量、同步刷库)一致性(要么全部成功要么失败)

很多业务里两者会 一起用:先 MULTI、把命令写进管道、最后 EXEC,既批量又原子。


在你这段代码里的价值

redisTemplate.executePipelined((RedisCallback<Void>) conn -> {for (UserDO u : userDOS) {conn.stringCommands().set(rawKey(u.getId()),          // set keyrawValue(dtoJson(u)),       // set valueExpiration.seconds(ttl),    // expireSetOption.UPSERT            // NX/XX 选项);}return null;
});
  1. 用管道写入全部用户缓存
    • 只走 1 次 Socket 写 + 1 次读,避免 300 条 set * 300 次 RTT。
  2. 线程池异步执行:主线程即刻返回,提升接口响应速度。
  3. TTL 打散:即便一起写,也给每条 key 加随机过期;结合管道不会产生“雪崩”。

可能的坑

  1. 结果一次性回传 —— 如果 N 很大,客户端要等所有命令执行完才能拿到结果,期间内存会上涨。
  2. 非原子 —— 若中间有一条命令写失败,不会回滚之前已成功的命令;业务需要能接受“部分成功”。
  3. 大批量要分段 —— 数十万条写同一管道可能撑爆客户端/服务器缓冲区,建议 5k~10k 条一批。

实战建议
  • 小批量 (< 20) 就别开管道,省下序列化/反序列化集合的开销。
  • 中批量 (几十~几千):管道最划算,RTT 成本明显下降。
  • 大批量 (万级以上):用 SCAN + 管道分批,或直接走异步任务/数据导入工具。

一句话:Pipeline 让“成百上千条命令”在网络上看起来像“一条”,极大降低网络和系统调用成本,是 Java 里批量读写 Redis 的首选加速器。

http://www.xdnf.cn/news/497629.html

相关文章:

  • Java大师成长计划之第25天:Spring生态与微服务架构之容错与断路器模式
  • Qt 强大的窗口停靠浮动
  • Javascript:WebAPI
  • React Fiber 架构深度解析:时间切片与性能优化的核心引擎
  • ARM (Attention Refinement Module)
  • spring -MVC-02
  • DeepSeek赋能电商,智能客服机器人破解大型活动人力困境
  • 数组集合互转问题
  • Ubuntu 安装 squid
  • 服装零售逆势密码:从4月英国7%增长看季节性消费新模型
  • 中国30米年度土地覆盖数据集及其动态变化(1985-2022年)
  • 一个指令,让任意 AI 快速生成思维导图
  • Unity序列化字段、单例模式(Singleton Pattern)
  • 通俗版解释CPU、核心、进程、线程、协程的定义及关系
  • 动态规划-64.最小路径和-力扣(LetCode)
  • c#车检车构客户管理系统软件车辆年审短信提醒软件
  • 系统架构设计(九):分布式架构与微服务
  • pytorch小记(二十二):全面解读 PyTorch 的 `torch.cumprod`——累积乘积详解与实战示例
  • Java求职面试:从核心技术到大数据与AI的场景应用
  • [Android] 安卓彩蛋:Easter Eggs v3.4.0
  • 第五项修炼:打造学习型组织
  • 前端基础之CSS
  • 大语言模型 11 - 从0开始训练GPT 0.25B参数量 MiniMind2 准备数据与训练模型 DPO直接偏好优化
  • 【诊所电子处方专用软件】佳易王个体诊所门诊电子处方开单管理系统:零售药店电子处方服务系统#操作简单#诊所软件教程#药房划价
  • Java 快速转 C# 教程
  • 30、WebAssembly:古代魔法——React 19 性能优化
  • 手撕四种常用设计模式(工厂,策略,代理,单例)
  • 设计模式Java
  • IDEA反斜杠路径不会显示JUnit运行的工作目录配置问题
  • 信奥赛-刷题笔记-栈篇-T2-P1981表达式求值0517