背景
通过监控mysql日志,获取表字段更新,用来做实时展示。
使用技术:Flink CDC
Flink CDC 基于数据库日志的 Change Data Caputre
技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。
下面正式开始:
1.Mysql 打开 bin-log 功能
# 查看是否开启binlog
mysql> SHOW VARIABLES LIKE '%log_bin%';
log_bin :ON 为开启 。 我用的是MySQL5.7,默认开启。
2.pom 文件添加引用
<properties><flink.version>1.17.1</flink.version><flink.cdc.version>2.4.0</flink.cdc.version></properties><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>30.1.1-jre-16.1</version></dependency>
3.flink cdc 部分代码
package org.wwj.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Properties;/*** 项目名称:wwj-mysql-flink-cdc* 类名称:MysqlCdc* 类描述:* 创建人:wuwenjin* 创建时间:2024/12/15* 修改人:* 修改时间:* 修改备注:** @version 1.0*/
@Component
public class MysqlCdc implements ApplicationRunner, Serializable {@Overridepublic void run(ApplicationArguments arg0) throws Exception {Properties debeziumProperties = new Properties();debeziumProperties.put("decimal.handling.mode", "String");// 日期格式后到处理MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("wwj").tableList("wwj.studnet").username("root").password("111111").scanNewlyAddedTableEnabled(true).debeziumProperties(debeziumProperties).deserializer(new JsonDebeziumDeserializationSchema());Configuration configuration = new Configuration();// 从最新位置开始获取日志sourceBuilder.startupOptions(StartupOptions.latest());// 避免flink集群akka超时configuration.setString("akka.ask.timeout", "120s");configuration.setString("web.timeout", "300000");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置 checkpoint 保存频次 30S/次env.enableCheckpointing(30000);// 设置checkpoint路径env.getCheckpointConfig().setCheckpointStorage("file:///E:/ff-Log");// 配置数据源,设置并行度DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "mysql-cdc-source").setParallelism(1);streamSource.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
上面代码是对wwj数据库中的 studnet表添加监控。
4.运行效果
将表中id=2的age由6改为7:
程序中获取到的日志:
完成数据格式:
{"before": {"id": "2","name": "wwj2","age": "6"},"after": {"id": "2","name": "wwj2","age": "7"},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1734417346000,"snapshot": "false","db": "wwj","sequence": null,"table": "studnet","server_id": 1,"gtid": null,"file": "logbin.000028","pos": 649462924,"row": 0,"thread": 1028,"query": null},"op": "u","ts_ms": 1734417346869,"transaction": null
}
before:修改前的数据 。 after:修改后的数据。op:操作类型 ,u更新,c新增,d删除 r读取
这样拿到变更数据后,就可以进一步分析,获取变化的字段,进行预警或者其它操作。
项目demo: https://github.com/qianchenyimeng/wwj-mysql-flink-cdc