欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > flinkCDC mysql

flinkCDC mysql

2025/2/27 2:53:41 来源:https://blog.csdn.net/zhangyupeng0528/article/details/143326905  浏览:    关键词:flinkCDC mysql

1.首先必须在要同步到源端和目标端创建好表

2.源端flinksql语句如下:

DROP TABLE IF EXISTS FLINK_SOURCE_hrpt_dtl;
create table FLINK_SOURCE_hrpt_dtl(
`template_dtl_id`bigint NOT NULL ,`template_id`bigint NOT NULL COMMENT '报表模板ID,hrpt_template.template_id',`template_url` varchar(480)   COMMENT '模板路径',`template_content` string  COMMENT '模板内容',`lang` varchar(30)  NOT NULL COMMENT '模板语言',`enabled_flag` tinyint NOT NULL  COMMENT '启用标识',`tenant_id`bigint NOT NULL  COMMENT '租户ID,hpfm_tenant.tenant_id',`object_version_number`bigint NOT NULL  COMMENT '行版本号,用来处理锁',`creation_date` timestamp NOT NULL,`created_by`bigint NOT NULL ,`last_updated_by`bigint NOT NULL ,`last_update_date` timestamp NOT NULL,PRIMARY KEY (`template_dtl_id`) NOT ENFORCED
)with (
'connector' = 'mysql-cdc',
'hostname' = '1.6.2.13',
'port' = '3306',
'username' = 'test',
'password' = 'test@123',
'database-name' = 'report',
'table-name' = 'hrpt_dtl',
'scan.incremental.snapshot.enabled' = 'false',
'debezium.database.tablename.case.insensitive' = 'false',
'debezium.log.mining.strategy' = 'online_catalog',
'server-time-zone' = 'Asia/Shanghai',
'debezium.log.mining.continuous.mine' = 'true'
);

3.目标端flinksql模板

DROP TABLE IF EXISTS FLINK_TARGET_hrpt_dtl;
create table FLINK_TARGET_hrpt_dtl(
`template_dtl_id`bigint NOT NULL ,`template_id`bigint NOT NULL COMMENT '报表模板ID,hrpt_template.template_id',`template_url` varchar(480)   COMMENT '模板路径',`template_content` string  COMMENT '模板内容',`lang` varchar(30)  NOT NULL COMMENT '模板语言',`enabled_flag` tinyint NOT NULL  COMMENT '启用标识',`tenant_id`bigint NOT NULL  COMMENT '租户ID,hpfm_tenant.tenant_id',`object_version_number`bigint NOT NULL  COMMENT '行版本号,用来处理锁',`creation_date` timestamp NOT NULL,`created_by`bigint NOT NULL ,`last_updated_by`bigint NOT NULL ,`last_update_date` timestamp NOT NULL,PRIMARY KEY (`template_dtl_id`) NOT ENFORCED
)with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.6.28.10:3306/cdc?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&serverTimezone=Asia/Shanghai',
'username' = 'test',
'password' = 'test@123',
'table-name' = 'hrpt_dtl',
'driver' = 'com.mysql.jdbc.Driver',
'scan.fetch-size' = '200'
);

4.把源端数据插入到目标端

insert into FLINK_TARGET_hrpt_dtl select * from FLINK_TARGET_hrpt_dtl ;

以上就是flink_CDC的全部过程。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词