今天是清明节,放假第一天也不得清闲。上午整理公司的交付文档,中午陪孩子户外骑行踏青,下午趁着休息的时间给老铁们讲下如何使用postgressql cdc插件来实时捕获数据。
注:CDC (Change Data Capture) 是一种技术,用于实时捕获和同步数据库中的更改。
1、基于docker搭建 postgressql环境。
1)创建容器
docker run -d \
--name postgres-cdc \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=123456 \
-e POSTGRES_DB=testdb \
-p 5432:5432 \
-v /Users/xxx/Documents/docker/pgdata:/var/lib/postgresql/data \
postgres \
postgres -c wal_level=logical -c max_replication_slots=10 -c max_wal_senders=10
2)进入容器
docker exec -it postgres-cdc psql -U postgres -d testdb
3)创建复制槽
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
4)创建发布
CREATE PUBLICATION debezium_publication FOR ALL TABLES;
2、将postgresql-cdc-1.0插件放到kettle的plugins目录下面,重启spoon。
3、设计转换,如下图所示:
1)本次转换用到了Postgresql CDC、JSON输入、Switch/case、写日志步骤。
Postgresql CDC步骤:解析Postgresql的日志数据。
JSON输入:解析日志数据,如{"ts_ms":1743757150325,"db":"testdb","table":"t1","op":"DELETE","before":"{\"id\":1}","pk":"{\"id\":1}"}
Switch/case:根据日志中的op字段进行路由日志数据,进行不同逻辑的处理。
写日志:模拟具体的逻辑操作。
2)Postgresql CDC配置
Debezium步骤配置如下(只输出dml),更多属性配置参考Debezium官网。如下图所示:
4、保存&运行
运行之后该转换会一直处于运行状态,只要数据源有发生变更,会实时接收到变更日志。
1)插入数据,sql为insert into t1(id,"name")values('1','Java小金刚'),转换接收数据如下图所示:
2)更新数据,update t1 set name='Java大金刚' where id='1';,转换接收数据如下图所示:
3)删除数据,delete from t1 where id='1',转换接收数据如下图所示:
done!!!