SpringBoot3.4.0集成Canal1.1.7实现MySQL实时同步数据到Redis
SpringBoot3.4.0集成Canal1.1.7实现MySQL实时同步数据到Redis

贝恩聊架构-SpringBoot3专栏内容定位主要为SpringBoot3整合各大框架和中间件,通过输出入门技术文章和基于源代码的技术案例,让读者基于本系列技术文章和源代码快速掌握SpringBoot3和各大开源框架、中间件的整合应用。给技术开发者节约环境搭建和框架整合所耗费的时间,开发人员都知道在框架整合的过程中会遇到各种技术问题和环境问题,所以参考已有搭建好的框架及功能能大大提高学习和工作效率。
Canal1.1.7环境搭建可以看上一篇文章MySQL增量数据同步利器Canal1.1.7环境搭建流程,今天我们基于SpringBoot3.4.0、Canal1.1.7、MySQL5.7.x、Redis6.x完成数据库业务数据到Redis分布式缓存的同步
基于MySQL日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
完整代码在文章最后,如果觉得本篇文章对你有用,记得点赞、关注、收藏哦。你的支持是我持续更新的动力!
SpringBoot3专栏软件环境
- JDK17.0.12
- SpringBoot3.4.0
- redisson-spring-boot-starter3.38.1
- Redis6.x
- Canal-Server1.1.7
- Canal-Admin1.1.7
- Canal-Client1.1.7
- IDEA2024.2.0.2
我们先看本篇文章对应的项目结构,请看下图

1 项目搭建
1.1 Canal项目依赖项
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.itbeien</groupId>
<artifactId>springboot3-labs-master</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-canal</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<canal.client-version>1.1.7</canal.client-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.client-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.client-version}</version>
</dependency>
<!-- <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>-->
</dependencies>
</project>
1.2 配置信息
#application.properties
server.port=2001
server.servlet.context-path=/canal
#canal
canal-monitor-mysql.host=192.168.0.105
#canal.properties canal.port
canal-monitor-mysql.port=11111
spring.data.redis.host=192.168.0.104
spring.data.redis.port=6379
spring.data.redis.password=Rootpwd20240809
# redis数据库编号
spring.data.redis.database=8
1.3 代码实现
canal实时从mysql获取数据,同步到分布式缓存redis,完成业务缓存刷新
package cn.itbeien.canal.util;
import cn.itbeien.canal.entity.SysUser;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Slf4j
@Component
public class CanalUtil {
@Value("${canal-monitor-mysql.host}")
String canalMonitorHost;
@Value("${canal-monitor-mysql.port}")
Integer canalMonitorPort;
@Autowired
private RedisClient redisClient;
private final static int BATCH_SIZE = 10000;
/**
* 启动服务
*/
// @Async("TaskPool")
public void startMonitorSQL() {
while (true) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "0.104", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
log.info("empty count :{} " , emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
log.info("empty too many times, exit");
} catch (Exception e) {
log.error("成功断开监测连接!尝试重连:{}",e);
} finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
log.error("成功断开监测连接!尝试重连:{}",e);
}
}
}
}
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//canal获取mysql数据库删除事件
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {//canal获取mysql数据库新增事件
printColumn(rowData.getAfterColumnsList());
} else {
log.info("-------> before");
printColumn(rowData.getBeforeColumnsList());
log.info("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
SysUser sysUser = new SysUser();
for (CanalEntry.Column column : columns) { //一行数据库数据=一个对象
log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
//获取字段名称和字段值,设置到实体类中
if(column.getName().equalsIgnoreCase("id")){
sysUser.setId(column.getValue());
}else if(column.getName().equalsIgnoreCase("name")){
sysUser.setName(column.getValue());
}else if(column.getName().equalsIgnoreCase("age")){
sysUser.setAge(Integer.valueOf(column.getValue()));
}else if(column.getName().equalsIgnoreCase("email")){
sysUser.setEmail(column.getValue());
}
}
if(sysUser.getId()!=null && !"".equals(sysUser.getId())){
String userJson = JSON.toJSONString(sysUser);
redisClient.set(sysUser.getId(),userJson);//保存用户数据
}
log.info(sysUser.toString());
}
}
2 MySQL数据同步到Redis
2.1 测试代码
package cn.itbeien.canal.test;
import cn.itbeien.canal.util.CanalUtil;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@SpringBootTest
public class CanalApplication {
@Autowired
private CanalUtil canalUtil;
@Test
public void test(){
this.canalUtil.startMonitorSQL();
}
}
2.2 环境准备
2.2.1 启动canal-admin
具体启动方式见上一篇文章

2.2.2 启动canal-server
具体启动方式见上一篇文章

2.2.3 启动canal-instance

2.2.4 启动canal-client
启动canal-client监听mysql增量数据,运行cn.itbeien.canal.test.CanalApplication

3 整体流程测试
在MySQL中新增一条数据

在canal-client端进行数据变更的监听

最后我们查询redis分布式缓存是否有id为88的这条数据

以上就是我们今天SpringBoot整合Canal完成业务数据从mysql到redis同步的整体流程,完整代码在文章最后获取!
欢迎大家关注我的项目实战内容itbeien.cn,一起学习一起进步,在项目和业务中理解各种技术。

欢迎沟通交流技术和支付业务,一起探讨聚合支付/预付卡系统业务、技术、系统架构、微服务、容器化。并结合聚合支付系统深入技术框架/微服务原理及分布式事务原理。加入我的知识星球吧

SpringBoot3专栏
01SpringBoot3专栏-SpringBoot3.4.0整合Mybatis-plus和Mybatis
02SpringBoot3.4.0结合Mybatis-plus实现动态数据源
03mapstruct对象映射在Springboot3中这样用就对了
04RocketMQ5.3.1集成SpringBoot3.4.0就这样简单
05SpringBoot3.4.0整合Redisson实现分布式锁
06MySQL增量数据同步利器Canal1.1.7环境搭建流程
跟着我学微服务系列
01跟着我学微服务,什么是微服务?微服务有哪些主流解决方案?
05SpringCloudAlibaba之图文搞懂微服务核心组件在企业级支付系统中的应用
06JDK17+SpringBoot3.4.0+Netty4.1.115搭建企业级支付系统POS网关
07JDK17+SpringCloud2023.0.3搭建企业级支付系统-预付卡支付交易微服务
08JDK17+Dubbo3.3.2搭建企业级支付系统-预付卡支付交易微服务
09JDK17+SpringBoot3.3.6+Netty4.1.115实现企业级支付系统POS网关签到功能
贝恩聊架构-项目实战地址
4 源码地址
贝恩聊架构-SpringBoot3专栏系列文章、资料和源代码会同步到以下地址,代码和资料每周都会同步更新
该仓库地址主要用于贝恩聊架构-SpringBoot3专栏、基于企业级支付系统,学习微服务整体技术栈
