欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 会展 > sql server cdc重启监控新加表字段

sql server cdc重启监控新加表字段

2025/1/10 5:34:58 来源:https://blog.csdn.net/shuangyueliao/article/details/144969453  浏览:    关键词:sql server cdc重启监控新加表字段

表新加字段,cdc要重启才能监控新加字段。但如果你有一个定时任务扫描cdc的_CT表,在以下两个场景会导致数据丢失。

  1. CDC重启时捕获不到changelog
  2. 重启前未消费完的CDC数据会被清空

通过以下脚本让cdc监控新字段可以避免数据丢失

下面脚本的DECLARE @TableName NVARCHAR(200) = 'your_table_name'这一行,将your_table_name替换成对应的表名,然后执行即可。

核心原理是通过开启一个新的CDC,使用替换的方法,不需要重启,数据也不丢失。

/*
表新加字段可以通过下面5个步骤实现
1、新开启一个临时捕捉实例
2、CT表增加字段
3、把新加字段写到cdc.captured_columns
4、修改cdc的存储过程,用临时捕捉实例的替换
cdc.sp_batchinsert_<objectid>
cdc.sp_insdel_<objectid>
cdc.sp_upd_<objectid>
5、删掉临时捕捉实例
*/
SET NOCOUNT ON;
DECLARE @Tibble             NVARCHAR(30) = '$tmp$'; /*临时CT表*/
DECLARE @ObjectID           INT;
DECLARE @MaxColumnID        INT;
DECLARE @ErrorMessage       NVARCHAR(MAX);
DECLARE @SchemaName         NVARCHAR(128);
DECLARE @ObjectName         NVARCHAR(128);
DECLARE @CaptureInstance    NVARCHAR(128);
DECLARE @TmpCaptureInstance NVARCHAR(128);
DECLARE @ColumnName         NVARCHAR(128);
DECLARE @ColumnID           INT;
DECLARE @ColumnDataType     NVARCHAR(128);
DECLARE @ColumnTypeName     NVARCHAR(128);
DECLARE @SQL                NVARCHAR(MAX);
DECLARE @ColumnOrdinal      INT;
DECLARE @IsComputed         BIT;
DECLARE @OriginalCDCObjectID INT;
DECLARE @NewCDCObjectID     INT;DECLARE @TableName NVARCHAR(200) = 'your_table_name'	-- 指定表/* 当前DB是否有CT表 */
IF EXISTS(SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('cdc.change_tables'))
BEGIN/* 游标删除临时CT表,也许是上次执行留下的 */DECLARE TmpCDCObjects CURSOR LOCAL FAST_FORWARD FORSELECT OBJECT_SCHEMA_NAME(source_object_id),OBJECT_NAME(source_object_id),capture_instanceFROM cdc.change_tablesWHERE capture_instance LIKE @Tibble + '%'AND OBJECT_NAME(source_object_id) = @TableName;OPEN TmpCDCObjects;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;WHILE (@@FETCH_STATUS = 0)BEGIN/* 删除临时CT表,即禁用 CDC capture instance */EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @CaptureInstance;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;ENDCLOSE TmpCDCObjects;/* 如果一张表启用 CDC capture instance 超过1个,则中断操作!*/IF EXISTS(SELECT source_object_id, COUNT(*) FROM cdc.change_tables GROUP BY source_object_id HAVING COUNT(*) > 1)BEGINDEALLOCATE TmpCDCObjects;SELECT @ErrorMessage = 'Unable to update CDC as there are objects with more than one capture instance in use. You must update CDC manually';THROW 50000, @ErrorMEssage, 1;END;/* Loop over all the CDC tables that do not have the latest columns *//* 遍历所有新字段cdc表做相关处理 */DECLARE CDCObjects CURSOR LOCAL FAST_FORWARD FORSELECT ct.object_id,ct.source_object_id,MAX(cc.column_id) AS MaxCapturedColumnIDFROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_idWHERE OBJECT_NAME(source_object_id) = @TableNameGROUP BY ct.source_object_id,ct.object_idHAVING MAX(cc.column_id) < (SELECT MAX(column_id) FROM sys.columns AS c WHERE c.object_id = ct.source_object_id);OPEN CDCObjects;FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;WHILE (@@FETCH_STATUS = 0)BEGINSELECT@SchemaName = OBJECT_SCHEMA_NAME(@ObjectID),@ObjectName = OBJECT_NAME(@ObjectID),@CaptureInstance = (SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = @ObjectID ORDER BY create_date);SELECT @TmpCaptureInstance = @Tibble + '_' + @CaptureInstance/* 新增临时 CDC capture instance , 使用一个临时名字*/EXEC sys.sp_cdc_enable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance, @role_name = 'cdc_Admin';/* 新增字段到原 CT 表 */DECLARE cColumns CURSOR LOCAL FAST_FORWARD FORWITH LastColumn AS (/* 获取所有CT表没有的最新字段 */SELECT ct.source_object_id,MAX(column_id) AS MaxColumnIDFROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_idWHERE cc.object_id = @OriginalCDCObjectIDGROUP BY ct.source_object_id) SELECT c.name,c.column_id ,CASEWHEN t.name IN ('datetime2', 'varchar', 'char', 'binary', 'varbinary', 'float') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS NVARCHAR(20)) END + ')'WHEN t.name IN ('nchar', 'nvarchar') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS NVARCHAR(20)) END + ')'WHEN t.name IN ('decimal', 'numeric') THEN t.name+ '(' + CAST(c.precision AS NVARCHAR(20)) + ', ' + CAST(c.scale AS NVARCHAR(20)) + ')'ELSE t.name END AS DataType,t.name AS TypeName,c.is_computedFROM sys.columns AS cINNER JOIN LastColumn AS lc ON c.object_id = lc.source_object_idINNER JOIN sys.types AS t ON c.user_type_id = t.user_type_idWHERE c.column_id > lc.MaxColumnID;OPEN cColumns;FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed;/* 查找字段最大序号 */SELECT @ColumnOrdinal = MAX(column_ordinal)FROM cdc.captured_columnsWHERE [object_id] = @OriginalCDCObjectID;WHILE (@@FETCH_STATUS = 0)BEGIN;SELECT @ColumnOrdinal += 1; /* Increment the column ordinal */--SELECT @CaptureInstance, @ColumnName, @ColumnDataType, @ColumnOrdinalSELECT @SQL = 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(@OriginalCDCObjectID)) + '.' + QUOTENAME(OBJECT_NAME(@OriginalCDCObjectID)) + ' ADD [' + @ColumnName + '] ' + @ColumnDataType + ' NULL';PRINT @SQL;EXEC(@SQL);/* 更新字段信息到元数据表 cdc.captured_colums */INSERT INTO cdc.captured_columns([object_id], column_name, column_id, column_type, column_ordinal, is_computed)VALUES(@OriginalCDCObjectID, @ColumnName, @ColumnID, @ColumnTypeName, @ColumnOrdinal, @IsComputed);FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputedEND;/* 找到刚更新的对象ID */SELECT @NewCDCObjectID = [object_id]FROM cdc.change_tablesWHERE source_object_id = @ObjectID AND object_id != @OriginalCDCObjectID/* batch insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_batchinsert_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* insdel insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_insdel_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* upd insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_upd_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* 下一张表对象 */FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;END;CLOSE CDCObjects;DEALLOCATE CDCObjects;/* 删除临时CT表 */OPEN TmpCDCObjects;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;WHILE (@@FETCH_STATUS = 0)BEGIN;/* 移除 CDC capture instance */EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;END;CLOSE TmpCDCObjects;DEALLOCATE TmpCDCObjects;
END;
SET NOCOUNT OFF;

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com