目录
1. 环境准备
1.1 MySQL配置
1.2 部署Canal Server
2. Spring Boot项目配置
2.1 添加依赖
2.2 配置参数
3. 实现Canal监听与同步
3.1 Canal客户端监听
3.2 同步到Redis
3.3 同步到Elasticsearch
4. 注意事项
在Spring Boot中通过Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch,可按照以下步骤实现:
1. 环境准备
1.1 MySQL配置
- 开启Binlog并设置为ROW模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
- 创建Canal用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2 部署Canal Server
- 下载Canal Server:Canal Releases
- 修改配置
conf/example/instance.properties
:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..* # 监听所有库表,或指定如test.user
2. Spring Boot项目配置
2.1 添加依赖
<!-- Canal客户端 -->
<dependency><groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version>
</dependency>
<!-- Redis -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 配置参数
application.yml
:
canal:server: 127.0.0.1:11111destination: exampleusername: canalpassword: canalspring:redis:host: localhostport: 6379data:elasticsearch:cluster-nodes: localhost:9200
3. 实现Canal监听与同步
3.1 Canal客户端监听
@Component
public class CanalListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ElasticsearchRestTemplate esTemplate;@PostConstructpublic void init() {CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");Thread thread = new Thread(() -> {connector.connect(); connector.subscribe(".*\\..*"); while (true) {Message message = connector.getWithoutAck(100); long batchId = message.getId(); if (batchId != -1) {processEntry(message.getEntries()); connector.ack(batchId); }}});thread.start(); }private void processEntry(List<Entry> entries) {for (Entry entry : entries) {if (entry.getEntryType() == EntryType.ROWDATA) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList()) {String tableName = entry.getHeader().getTableName(); EventType eventType = rowChange.getEventType(); // 解析变更前后的数据Map<String, String> before = parseColumns(rowData.getBeforeColumnsList()); Map<String, String> after = parseColumns(rowData.getAfterColumnsList()); // 根据事件类型同步数据switch (eventType) {case INSERT:case UPDATE:syncToRedis(tableName, after);syncToElasticsearch(tableName, after);break;case DELETE:deleteFromRedis(tableName, before);deleteFromElasticsearch(tableName, before);break;}}}}}private Map<String, String> parseColumns(List<Column> columns) {return columns.stream() .collect(Collectors.toMap(Column::getName, Column::getValue));}
}
3.2 同步到Redis
private void syncToRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id"); // 假设主键为idredisTemplate.opsForValue().set(key, data);
}private void deleteFromRedis(String tableName, Map<String, String> data) {String key = tableName + ":" + data.get("id"); redisTemplate.delete(key);
}
3.3 同步到Elasticsearch
private void syncToElasticsearch(String tableName, Map<String, String> data) {IndexQuery indexQuery = new IndexQueryBuilder().withId(data.get("id")) .withObject(data).build();esTemplate.index(indexQuery, IndexCoordinates.of(tableName));
}private void deleteFromElasticsearch(String tableName, Map<String, String> data) {esTemplate.delete(data.get("id"), IndexCoordinates.of(tableName));
}
4. 注意事项
- 异常处理:增加重试机制或记录错误日志,确保网络波动时的数据一致性。
- 性能优化:批量处理Canal消息,减少Redis/ES的频繁写入。
- 数据结构:确保Elasticsearch的索引Mapping与MySQL表结构兼容。
- 事务管理:如需强一致性,可结合本地事务表或消息队列(如RocketMQ)做可靠投递。
通过以上步骤,Spring Boot应用能够实时监听MySQL变更,并自动同步到Redis和Elasticsearch,保障数据一致性。