MySQL数据准备
create database if not exists test;
use test;
drop table if exists stu;
create table stu (id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("张三",18);
insert into stu(name, age) values("李四",20);
insert into stu(name, age) values("王五",21);
注意:表必须有主键
开启MySQL binlog
修改MySQL配置,开启binlog
$ sudo vim /etc/my.cnf,添加如下设置
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test
注意:启用binlog的数据库,需根据实际情况作出修改
重启mysql
$ sudo systemctl restart mysqld
代码开发
依赖
Flink CDC依赖
<!--cdc 依赖--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency>
完整依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><flink-cdc.vesion>2.4.0</flink-cdc.vesion></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!--目前中央仓库还没有 jdbc的连接器,暂时用一个快照版本--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--cdc 依赖--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.vesion}</version></dependency></dependencies>
Flink代码
Flink CDC捕获MySQL变更数据(增加、修改、删除),输出到控制台。
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCDemo {public static void main(String[] args) throws Exception {// 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 数据源MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node4").port(3306).username("root").password("000000").databaseList("test").tableList("test.stu").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"mysql_source").setParallelism(1);// 处理数据// 输出数据dataStreamSource.print();// 执行env.execute();}
}
运行程序,确保程序无报错,看到如下输出:
18:58:51,826 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread is running
测试
添加数据
mysql添加数据
mysql> insert into stu(name, age) values("赵六",23);
IDEA控制台输出
{"before":null,"after":{"id":4,"name":"赵六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}
格式化输出
{"before": null,"after": {"id": 4,"name": "赵六","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831654000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2300,"row": 0,"thread": 13,"query": null},"op": "c","ts_ms": 1719831654692,"transaction": null }
关注before、after符合增加数据的逻辑,op为c表示添加数据
修改数据
mysql修改数据
mysql> update stu set name="zl", age=19 where name="赵六";
IDEA控制台输出
{"before":{"id":4,"name":"赵六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}
格式化输出
{"before": {"id": 4,"name": "赵六","age": 23},"after": {"id": 4,"name": "zl","age": 19},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831987000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2604,"row": 0,"thread": 13,"query": null},"op": "u","ts_ms": 1719831987238,"transaction": null }
关注before、after符合更新的逻辑,op为u表示更新数据
删除数据
mysql删除数据
mysql> delete from stu where id=4;
IDEA控制台输出
{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null}
格式化输出
{"before": {"id": 4,"name": "zl","age": 19},"after": null,"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719832151000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2913,"row": 0,"thread": 13,"query": null},"op": "d","ts_ms": 1719832151198,"transaction": null }
关注before、after符合删除的逻辑,op为d表示删除数据
完成!enjoy it!