当前位置: 首页 > ds >正文

[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

在开发用户搜索功能时,我们通常会将用户信息存储到 Elasticsearch(简称 ES) 中,以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步,以及如何通过 MQ 消息队列实现用户信息实时更新 的机制。

一、整体思路

为了保证用户数据在 MySQL 与 ES 之间保持一致,我们采用了以下 双通道同步策略

  1. 定时任务 + 游标机制:实现 MySQL 到 ES 的增量同步

  2. 通过 MQ(消息队列) 实现实时同步用户更新/删除操作到 ES


二、定时任务增量同步逻辑详解

我们定义了一个定时任务 syncUserDataToESJob,主要用于从 user 表中 增量拉取变动数据,并同步到 ES。

✨ 增量拉取机制

为了避免全量同步的高开销,我们使用了 “更新时间 + 主键 ID”双重游标,实现分页增量同步:

List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);

其中:

  • lastSyncTime 表示上次同步的最大更新时间

  • lastMaxId 用于处理相同更新时间下的并发写入

🧠 同步逻辑核心代码如下:

@XxlJob("syncUserDataToESJob")
@GlobalTransactional
public void syncUserData() {Date lastSyncTime = syncPointService.getLastSyncTime();Long lastMaxId = syncPointService.getLastMaxId();if (lastSyncTime == null) {lastSyncTime = new Date(0); // 默认从最早开始lastMaxId = 0L;}Date maxUpdateTime = lastSyncTime;Long maxId = lastMaxId;boolean hasNewData = false;while (true) {List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);if (usersBatch.isEmpty()) break;hasNewData = true;List<EsUserDoc> esDocs = usersBatch.stream().map(this::convertToEsDoc).collect(Collectors.toList());esClient.bulkIndex(esDocs);for (User u : usersBatch) {Date updateTime = u.getUpdateTime();if (updateTime.after(maxUpdateTime)) {maxUpdateTime = updateTime;maxId = u.getId();} else if (updateTime.equals(maxUpdateTime) && u.getId() > maxId) {maxId = u.getId();}}lastSyncTime = maxUpdateTime;lastMaxId = maxId;}// 同步删除数据List<Long> deletedUserIds = userClient.selectDeletedUserIds(syncPointService.getLastSyncTime(), syncPointService.getLastMaxId());if (!deletedUserIds.isEmpty()) {esClient.bulkDeleteByIds(deletedUserIds);}if (hasNewData) {log.info("更新同步点:maxUpdateTime = {}, maxId = {}", maxUpdateTime, maxId);syncPointService.updateLastSyncPoint(maxUpdateTime, maxId);} else {log.info("本次没有增量数据,不更新同步点");}
}

📝 特别说明:

  • syncPointService 用于记录上次同步的时间点和 ID,保证每次定时任务可重复安全执行。

  • 如果服务中断重启,也不会造成数据丢失或重复。


三、用户修改通过 MQ 实时同步到 ES

虽然定时任务可以周期性同步,但如果用户更新昵称、头像、标签等信息,等待下一次定时任务才能生效,可能会造成 数据延迟

为此,我们引入了 消息队列机制,实现实时更新:

✅ 使用 MQ 的同步方案

  1. 用户信息发生变化时,在业务服务中发送一条消息:

UserUpdateMessage message = new UserUpdateMessage(userId);
rabbitTemplate.convertAndSend("user.topic.exchange", "user.update", message);
  1. 在 ES 同步服务中监听消息并处理:

@RabbitListener(queues = "user.update.queue")
public void onUserUpdate(UserUpdateMessage msg) {User user = userClient.getUserById(msg.getUserId());if (user != null) {EsUserDoc doc = convertToEsDoc(user);esClient.index(doc);}
}

💡 好处:

  • 实时:用户更新后立即同步到 ES

  • 解耦:业务逻辑与搜索逻辑分离

  • 高性能:避免频繁更新 ES


四、总结与展望

通过“定时任务 + 增量游标” 和 “消息队列实时更新” 的结合方案,我们实现了对用户数据高效且可靠的同步到 Elasticsearch。

同步方式特点使用场景
定时任务批量、容错性强周期性同步新增/修改/删除
MQ 实时快速、解耦用户主动更新资料时快速生效

未来我们还可以扩展以下能力:

  • 引入 Canal + Binlog 监听实现更彻底的实时同步

  • 支持多租户分库分表的场景下数据同步

  • 引入失败重试机制保障消息不丢


希望本文对你在做数据同步或 ES 架构设计时有所启发,欢迎点赞、收藏、评论交流!

http://www.xdnf.cn/news/8731.html

相关文章:

  • Linux(6)——第一个小程序(进度条)
  • LeetCode 2942.查找包含给定字符的单词:使用库函数完成
  • 台式机安装新的固态硬盘后无显示
  • 【C语言练习】060. 使用指针操作字符串
  • Kotlin全栈工程师转型路径
  • Vue-创建应用/挂载应用/根组件模版-.vue单文件/应用配置
  • Cesium中根据不同条件设置3D Tiles样式
  • 【VBA 中GetOpenFilename】常用友好的人机交互文件全路径选择模式
  • 计算机视觉与深度学习 | 基于 YOLOv8 + BeautyGAN + CodeFormer + Face Parsing 实现简单的人脸美颜
  • 【来自纳米AI-大模型】ubuntu 24.04 登陆界面分辨率太高,内容显示得特别小 问题解决方案(亲测有效)
  • lua脚本学习笔记1:Vscode添加lua环境_lua基本语法
  • HarmonyOS赋能套件介绍
  • 开篇:MCP理论理解和学习
  • 元组可以比较大小吗?一次返回多个值?编程语言的元组?声明变量一定需要指定类型吗?
  • Ubuntu20.04 gr-gsm完整安装教程
  • Kanass V1.1.1版本发布,支持查看项目/迭代/事项进度
  • 剖析 Spring 中 @ResponseBody 原理与 Tomcat NIO 写事件(SelectionKey.OP_WRITE)的协作机制
  • MySQL分库分表
  • vue3中使用computed
  • Spark集群架构解析:核心组件与Standalone、YARN模式深度对比(AM,Container,Driver,Executor)
  • kafka之操作示例
  • 大文件上传,对接阿里oss采用前端分片技术。完成对应需求!
  • 【MySQL】第7节|Mysql锁机制与优化实践以及MVCC底层原理剖析
  • ubuntu 安装latex
  • 清除 Ubuntu 磁盘空间
  • 安卓开发用到的设计模式(2)结构型模式
  • 开发者工具箱-鸿蒙金额转换开发笔记
  • R语言学习--Day08--bootstrap原理及误区
  • Ollama01-安装教程
  • 【MySQL】07.表内容的操作