欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > SpringBoot整合Canal+RabbitMQ监听数据变更

SpringBoot整合Canal+RabbitMQ监听数据变更

2025/2/23 16:31:16 来源:https://blog.csdn.net/wls_gk/article/details/144767861  浏览:    关键词:SpringBoot整合Canal+RabbitMQ监听数据变更

需求

在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 MySQL 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。

Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。

步骤

  1. 环境搭建
  2. 整合SpringBoot与Canal实现客户端
  3. Canal整合RabbitMQ
  4. SpringBoot整合RabbitMQ

环境搭建

1. 安装MySQL

确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重启 MySQL 服务使配置生效。

2. 安装Canal Server

下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行必要的配置,特别是 instance.properties 文件中的数据库连接信息。

3. 安装RabbitMQ

可以通过 Docker 快速安装 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。

整合SpringBoot与Canal实现客户端
创建SpringBoot项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。

引入Canal依赖

pom.xml 中添加 Canal Client 的依赖:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version>
</dependency>
编写Canal客户端代码

创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;public class CanalClient {private final CanalConnector connector;private final Channel channel;public CanalClient(CanalConnector connector, Channel channel) {this.connector = connector;this.channel = channel;}public void start() {// Canal 连接配置connector.connect();connector.subscribe(".*\\..*"); // 订阅所有数据库和表connector.rollback();while (true) {int batchSize = 1000;EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据long batchId = batch.getId();int size = batch.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {printEntry(batch.getEntries());connector.ack(batchId); // 提交确认}if (Thread.currentThread().isInterrupted()) {break;}}connector.disconnect();}private void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {sendToRabbitMQ(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {sendToRabbitMQ(rowData.getAfterColumnsList());} else {System.out.println("-------> before");sendToRabbitMQ(rowData.getBeforeColumnsList());System.out.println("-------> after");sendToRabbitMQ(rowData.getAfterColumnsList());}}}}private void sendToRabbitMQ(List<Column> columns) {StringBuilder message = new StringBuilder();for (Column column : columns) {message.append(column.getName()).append("=").append(column.getValue()).append(",");}try {channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());} catch (IOException e) {e.printStackTrace();}}
}

Canal整合RabbitMQ

配置Canal Server

确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。

配置RabbitMQ Exchange

在 RabbitMQ 中创建一个名为 canal_exchange 的 exchange,类型可以根据需要选择,如 fanout, direct, topicheaders

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依赖

确保在 pom.xml 中已经包含了 RabbitMQ 的 Spring AMQP 依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ连接信息

application.ymlapplication.properties 中配置 RabbitMQ 的连接参数。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
创建消费者

编写一个消费者类来接收来自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class CanalMessageConsumer {@RabbitListener(queues = "canal_queue")public void receive(String message) {System.out.println("Received message: " + message);}
}
配置队列和绑定

确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue canalQueue() {return new Queue("canal_queue", false);}@Beanpublic TopicExchange canalExchange() {return new TopicExchange("canal_exchange");}@Beanpublic Binding binding(Queue canalQueue, TopicExchange canalExchange) {return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");}
}
总结

通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。


版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词