欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 培训 > mysql flink cdc 实时数据抓取

mysql flink cdc 实时数据抓取

2025/2/25 13:47:36 来源:https://blog.csdn.net/WUWENJINWUWENJIN/article/details/144534557  浏览:    关键词:mysql flink cdc 实时数据抓取

背景

通过监控mysql日志,获取表字段更新,用来做实时展示。

 使用技术:Flink CDC

Flink CDC 基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

下面正式开始:

1.Mysql 打开 bin-log 功能

# 查看是否开启binlog
mysql> SHOW VARIABLES LIKE '%log_bin%';

 

log_bin :ON 为开启 。 我用的是MySQL5.7,默认开启。

2.pom 文件添加引用

  <properties><flink.version>1.17.1</flink.version><flink.cdc.version>2.4.0</flink.cdc.version></properties><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>30.1.1-jre-16.1</version></dependency>

 3.flink cdc 部分代码

package org.wwj.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Properties;/*** 项目名称:wwj-mysql-flink-cdc* 类名称:MysqlCdc* 类描述:* 创建人:wuwenjin* 创建时间:2024/12/15* 修改人:* 修改时间:* 修改备注:** @version 1.0*/
@Component
public class MysqlCdc implements ApplicationRunner, Serializable {@Overridepublic void run(ApplicationArguments arg0) throws Exception {Properties debeziumProperties = new Properties();debeziumProperties.put("decimal.handling.mode", "String");// 日期格式后到处理MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("wwj").tableList("wwj.studnet").username("root").password("111111").scanNewlyAddedTableEnabled(true).debeziumProperties(debeziumProperties).deserializer(new JsonDebeziumDeserializationSchema());Configuration configuration = new Configuration();// 从最新位置开始获取日志sourceBuilder.startupOptions(StartupOptions.latest());// 避免flink集群akka超时configuration.setString("akka.ask.timeout", "120s");configuration.setString("web.timeout", "300000");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置 checkpoint 保存频次 30S/次env.enableCheckpointing(30000);// 设置checkpoint路径env.getCheckpointConfig().setCheckpointStorage("file:///E:/ff-Log");// 配置数据源,设置并行度DataStreamSource<String> streamSource = env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "mysql-cdc-source").setParallelism(1);streamSource.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

 上面代码是对wwj数据库中的 studnet表添加监控。

4.运行效果

将表中id=2的age由6改为7:

 

 程序中获取到的日志:

完成数据格式:

{"before": {"id": "2","name": "wwj2","age": "6"},"after": {"id": "2","name": "wwj2","age": "7"},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1734417346000,"snapshot": "false","db": "wwj","sequence": null,"table": "studnet","server_id": 1,"gtid": null,"file": "logbin.000028","pos": 649462924,"row": 0,"thread": 1028,"query": null},"op": "u","ts_ms": 1734417346869,"transaction": null
}

before:修改前的数据 。 after:修改后的数据。op:操作类型 ,u更新,c新增,d删除 r读取

这样拿到变更数据后,就可以进一步分析,获取变化的字段,进行预警或者其它操作。

项目demo: https://github.com/qianchenyimeng/wwj-mysql-flink-cdc

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词