Canal使用
Canal
1.项目问题分析-数据同步问题
1.1问题分析
问题一:专辑数据修改同步缓存的问题
/*** 根据专辑ID查询专辑信息(包含专辑标签及值)-从数据库获取* 存入redis中业务数据key=album:info:1 锁key=album:info:1:lock* @param id* @return*/@Override@GuiGuCache(prefix="album:info:") public AlbumInfo getAlbumInfoFromDB(Long id) {//1.根据专辑ID主键查询专辑信息AlbumInfo albumInfo = albumInfoMapper.selectById(id);//2.根据专辑ID查询标签列表if (albumInfo != null) {LambdaQueryWrapper<AlbumAttributeValue> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(AlbumAttributeValue::getAlbumId, id);List<AlbumAttributeValue> attributeValueList = albumAttributeValueMapper.selectList(queryWrapper);albumInfo.setAlbumAttributeValueVoList(attributeValueList);}return albumInfo;}
问题说明:
在查询专辑详情数据时,为避免频繁io,提高查询效率,在详情数据获取的同时,利用缓存机制,将详情数据存储到redis实现的缓存机制中,但带来效率提升的同时也带来了问题,就是如果在专辑录入之后,因为业务数据更新等原因,对于mysql中专辑原数据进行了修改,将导致redis中缓存数据和mysql中原始数据不一致的问题
问题二:用户信息缓存问题
UserInfoServiceImpl用户登录成功后
/*** 查询用户基本信息** @param userId* @return*/@Override@GuiGuCache(prefix = "userInfoVo:")public UserInfoVo getUserInfo(Long userId) {//1.根据主键ID查询用户对象UserInfo userInfo = userInfoMapper.selectById(userId);//2.将用户PO对象转为VO对象return BeanUtil.copyProperties(userInfo, UserInfoVo.class);}
问题说明:
用户认证登录实现成功后,再查询用户信息回显,第一次从mysql的user_info表中查询数据后,对数据进行了缓存,若用户在之后对个人信息进行了更新或者补充,将导致数据不一致,不能及时更新的问题。
1.2方案说明
常用方案:
1.redis的延时双删策略
2.消息队列同步
3.canal同步
第一、redis的延时双删策略
延时双删策略是一种常见的保证MySQL和Redis数据一致性的方法。其主要流程包括:先删除缓存,然后更新数据库。这个过程完成后,大约在数据库从库更新后再次删除缓存。具体的步骤如下:
- 先执行redis.del(key)操作删除缓存;
- 然后执行写数据库的操作;
- 休眠一段时间(例如500毫秒),根据具体的业务时间来定;
- 再次执行redis.del(key)操作删除缓存。
延时双删策略通过这种方式尝试达到最终的数据一致性,但是这并不是强一致性,因为MySQL和Redis主从节点数据的同步并不是实时的,所以需要等待一段时间以增强它们的数据一致性。同时,由于读写是并发的,可能出现缓存和数据库数据不一致的问题
第二、消息队列同步方案
消息队列同步方案是一种常见的MySQL和Redis数据同步方法,其主要流程包括:将MySQL的数据变化消息队列同步方案是一种常见的MySQL和Redis数据同步方法,其主要流程包括:将MySQL的数据变化作为消息发布到消息队列(如Kafka、RabbitMQ等),然后由另一个消费者程序从消息队列中读取数据并写入Redis。这种方式可以实现数据的异步同步,降低系统压力。
具体步骤如下:
- 在MySQL数据库中进行增删改操作时,同时将相应的数据变化作为消息发布到消息队列中;
- 启动一个消费者程序,监听消息队列中的消息,并将消息中的MySQL数据变化写入Redis缓存中;
- 当消费者程序处理完一条消息后,需要确认该消息已被处理,可以使用消息队列的确认机制或者手动确认;
- 如果消费者程序出现异常或宕机等情况,需要保证消息队列中的消息不会丢失,可以选择持久化消息队列或将消息备份到其他存储介质中。
使用消息队列同步方案可以有效地解决MySQL和Redis数据同步的问题,并且能够实现数据的异步处理和削峰填谷的效果。但是由于消息队列本身也存在一定的延迟和可靠性问题。
第三、canal同步方案
Canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析的数据同步工具,可以实现MySQL数据库的增量数据订阅和消费。Canal方案可以用于实现MySQL和Redis之间的数据同步,具体流程如下:
- 在MySQL数据库中开启binlog日志功能,并配置Canal客户端连接到MySQL主库;
- 启动Canal客户端,监听MySQL主库的binlog日志变化,并将变化的数据实时推送到指定的队列中;
- 启动一个消费者程序,监听Canal客户端推送过来的数据,并将数据写入Redis缓存中;
- 当消费者程序处理完一条消息后,需要确认该消息已被处理,可以使用消息队列的确认机制或者手动确认;
- 如果消费者程序出现异常或宕机等情况,需要保证消息队列中的消息不会丢失,可以选择持久化消息队列或将消息备份到其他存储介质中。
使用Canal方案可以实现MySQL和Redis之间的实时数据同步,并且支持多种数据源和数据格式。但是需要注意的是,由于Canal方案依赖于MySQL的binlog日志,所以需要确保MySQL主库的binlog日志功能已经开启,并且Canal客户端能够正确地连接到MySQL主库
本项目中选择第三个方案:canal同步方案
2.数据同步方案- Canal
2.1 Canal 概念
2.1.1 什么是Canal
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
官网:https://github.com/alibaba/canal
2.1.2Canal的特征
Canal具有以下特征:
- 高性能:Canal 采用了基于网络协议的方式来解析和同步 MySQL 的增量日志,相较于数据库级别的触发器或轮询方式,可以提供更高的同步性能。
- 支持多种数据格式:Canal 可以将 MySQL 的增量日志解析为多种数据格式,包括 JSON、XML 等,方便用户进行二次开发和数据处理。
- 多种同步方式:Canal 支持多种同步方式,包括基于缓存、MQ、HTTP 接口等多种方式,可以根据业务需求选择不同的同步方式。
- 灵活的订阅机制:Canal 支持灵活的订阅机制,可以根据表、库、列等维度进行精确的订阅,同时也支持动态增加和删除订阅。
- 多种部署方式:Canal 可以在单机、集群等多种环境下进行部署,同时也支持 Docker 容器化部署。
- 易于扩展:Canal 采用了插件化的设计,支持用户自定义插件,可以方便地扩展新的功能。
总的来说,Canal 是一款功能强大、性能高效、易于使用、可扩展的数据同步工具,被广泛应用于阿里巴巴和其他企业的数据同步场景中
2.1.3业务场景
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
2.2 工作原理
工作原理
canal的工作原理是将自己伪装成mysql的slave节点,来订阅mysql binlog的变更,所以在配置启动canal前,需要先配置mysql。
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
2.2.1 canal的架构
Canal是C/S架构,由canal client与canal server组成,客户端和服务端之间是C/S模式的通信,客户端采用NIO,服务端采用Netty{netty.io}。canal server启动后,如果没有canal client,那么canal server不会去mysql拉取binlog。即Canal客户端主动发起拉取请求,服务端才会模拟一个MySQL Slave节点去主节点拉取binlog。通常Canal客户端是一个死循环,这样客户端一直调用get方法,服务端也就会一直拉取binlog.
组成说明:
- server 代表一个 canal 运行实例,对应于一个jvm进程
- instance 对应于一个数据队列 (1个 canal server 对应1个或者多个 instance )
- instance 下的子模块
- eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
- eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
- eventStore: 数据存储
- metaManager: 增量订阅 & 消费信息管理器
2.2.2 canal的工作原理
EventParser在向MySQL发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新Log Position。
Canal流程图:
具体流程:
- EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
- EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
- MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
- void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
3 Canal安装
安装使用canal前要提前安装好mysql,本章mysql版本选择mysq8.0.29版本,canal选择1.1.5版本
3.1安装前要求
3.1.1 mysql开启binlog模式
canal-admin: 图形化界面
(1)查看当前mysql是否开启binlog模式。
#进入mysql容器
mysql -uroot -proot
#开启binlog模式
SHOW VARIABLES LIKE '%log_bin%';
结果:mysql的log_bin默认是开启,如果log_bin的值为OFF是未开启,为ON是已开启。
(2) 进入mysql
mysql -h localhost -u root -p
(3)创建账号 用于测试使用
#创建用户
create user canal@'%' IDENTIFIED by 'canal';
#给用户授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
#如果是MySQL8.X以上需要对加密方式进行设置
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
#刷新生效
FLUSH PRIVILEGES;
3.2 安装Canal
(1)下载canal镜像
docker pull canal/canal-server:v1.1.5
(2)创建容器
docker run -p 11111:11111 --name canal \
-e canal.destinations=tingshuTopic \
-e canal.instance.master.address=192.168.200.130:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\..* \
-d canal/canal-server:v1.1.5
(3)查看启动日志
docker logs -f canal#日志详情
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Starting sshd: [ OK ]
Starting crond: [ OK ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ..
4.项目问题处理
canal-spring-boot-starter启动器
经过整合Canal官方提供的SimpleCanalClient方案已经可以实现,在数据库中数据发生改变时,通过binlog日志文件及时获取数据,但是在canal提供的api过于繁琐,因此可使用canal-spring-boot-starter
Canal-spring-boot-starter是一个基于Spring Boot框架的插件,用于实现Canal和Spring Boot框架的整合。Canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析工具,通过监听数据库中的binlog(二进制日志),实现对数据库的变更事件进行监听和捕捉,从而能够实现实时同步和分析数据库的变化。
Canal-spring-boot-starter将Canal和Spring Boot框架整合在一起,可以让开发者通过简单的配置,轻松地实现对数据库变更事件的监听和处理,可以方便地实现数据同步、消息推送、数据分析等功能。同时,Canal-spring-boot-starter还提供了一些默认的配置,包括数据源、表名、过滤规则等,可以让开发者快速上手并开始使用。
主要特征:
- 便捷性:Canal-spring-boot-starter基于Spring Boot框架,可以方便地整合到Spring Boot应用中,并提供了默认配置,让开发者可以快速上手使用。
- 功能强大:Canal-spring-boot-starter使用了Canal增量日志解析工具,可以实现对数据库变更事件的监听和捕捉,并支持多种操作类型的处理。
- 高可用性:Canal-spring-boot-starter支持多节点的部署,可以实现数据同步的高可用性。
- 易于扩展:Canal-spring-boot-starter提供了丰富的API和插件,可以方便地扩展和定制,满足不同场景下的需求。
4.1 service-cdc工程搭建
CDC(Change Data Capture,变更数据捕获)
service创建canal同步子工程
pom添加依赖
注意:此处不依赖父工程,作为独立工程存在
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.tingshu</groupId><artifactId>service-cdc</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!--web 需要启动项目--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>javax.persistence</groupId><artifactId>persistence-api</artifactId><version>1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>
</project>
application.yaml
spring:profiles:active: dev
application-dev.yaml
server:port: 7080
#canal配置
canal:destination: tingshuTopic #Canal服务端发送数据的话题名称跟上面容器里参数destinations的一样server: 192.168.200.130:11111
spring:redis:host: 192.168.200.130port: 6379
启动类CDCApplicaiton
package com.atguigu.tingshu;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author: atguigu* @create: 2024-1-9 10:53*/
@SpringBootApplication
public class CDCApplicaiton {public static void main(String[] args) {SpringApplication.run(CDCApplicaiton.class, args);}
}
添加redis配置类
com.atguigu.tingshu.config;
package com.atguigu.tingshu.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author: atguigu* @create: 2024-1-9 10:53*/
@Configuration
public class RedisConfig {@Bean@Primarypublic RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);//String的序列化方式StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// 使用GenericJackson2JsonRedisSerializer 替换默认序列化(默认采用的是JDK序列化)GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();//序列号key valueredisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}
}
entity
package com.atguigu.tingshu.model;import lombok.Data;import javax.persistence.Column;/**** @author: atguigu* @create: 2024-1-9 10:53*/
@Data
public class CDCEntity {// 注意Column 注解必须是persistence包下的@Column(name = "id")private Long id;
}
4.2 专辑数据更新同步
编写处理器自定义实现EntryHandler接口
EntryHandler
类的作用是处理 Canal 数据变更事件。在阿里巴巴的开源项目 Canal 中,该类用于监听数据库表的数据变更,并将变更的数据进行处理和存储。具体来说,当 Canal 客户端连接到 Canal 服务器并订阅了相应的数据库表后,每当表中的数据发生变更时,Canal 服务器会将变更的数据封装成一个
Entry
对象,然后通过 Canal 协议将这个对象发送给 Canal 客户端。Canal 客户端接收到这个Entry
对象后,会调用EntryHandler
类的相应方法来处理这个对象。
EntryHandler
类通常需要实现以下三个方法:
public interface EntryHandler<T> {//监听到数据添加default void insert(T t) {}//监听到数据修改default void update(T before, T after) {}//监听数据删除default void delete(T t) {}
}
注解@CanalTable
@CanalTable("album_info")
的作用是指定一个表名,用于在MyBatis Plus中与Canal进行数据同步。当MyBatis Plus执行数据库操作时,它会将操作记录到Canal中,然后通过监听器将Canal中的数据同步到目标数据库。通过使用@CanalTable
注解,可以指定要同步的表名,以便只同步特定的表。
专辑同步处理器-AlbumInfoCdcHandler
package com.atguigu.tingshu.handler;import com.atguigu.tingshu.model.CDCEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;/*** @author: atguigu* @create: 2024-1-9 10:53*/
@Slf4j
@Component
@CanalTable("album_info") 监听变更表
public class AlbumInfoCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;/*** 监听到数据修改** @param before 变更前数据* @param after 变更后数据*/@Overridepublic void update(CDCEntity before, CDCEntity after) {log.info("监听到数据修改,ID:{}", after.getId());String key = "album:info:" + after.getId();redisTemplate.delete(key);}/*** 监听到删除操作** @param cdcEntity 删除前数据*/@Overridepublic void delete(CDCEntity cdcEntity) {log.info("监听到数据删除,ID:{}", cdcEntity.getId());String key = "album:info:" + cdcEntity.getId();redisTemplate.delete(key);}
}
4.3 用户信息更新同步
用户信息同步处理器-UserCdcHandler
package com.atguigu.tingshu.handler;import com.atguigu.tingshu.model.CDCEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;/*** @author: atguigu* @create: 2024-1-9 10:53*/
@Slf4j
@Component
@CanalTable("user_info") 监听变更表
public class UserCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;/*** 监听到数据修改** @param before 变更前数据* @param after 变更后数据*/@Overridepublic void update(CDCEntity before, CDCEntity after) {log.info("监听到数据修改,ID:{}", after.getId());String key = "userInfoVo:" + after.getId();redisTemplate.delete(key);}/*** 监听到删除操作** @param cdcEntity 删除前数据*/@Overridepublic void delete(CDCEntity cdcEntity) {log.info("监听到数据删除,ID:{}", cdcEntity.getId());String key = "userInfoVo:" + cdcEntity.getId();redisTemplate.delete(key);}
}
- 监听到数据修改
*- @param before 变更前数据
- @param after 变更后数据
*/
@Override
public void update(CDCEntity before, CDCEntity after) {
log.info(“监听到数据修改,ID:{}”, after.getId());
String key = “userInfoVo:” + after.getId();
redisTemplate.delete(key);
}
/*** 监听到删除操作** @param cdcEntity 删除前数据*/
@Override
public void delete(CDCEntity cdcEntity) {log.info("监听到数据删除,ID:{}", cdcEntity.getId());String key = "userInfoVo:" + cdcEntity.getId();redisTemplate.delete(key);
}
}