表新加字段,cdc要重启才能监控新加字段。但如果你有一个定时任务扫描cdc的_CT表,在以下两个场景会导致数据丢失。
- CDC重启时捕获不到changelog
- 重启前未消费完的CDC数据会被清空
通过以下脚本让cdc监控新字段可以避免数据丢失
下面脚本的DECLARE @TableName NVARCHAR(200) = 'your_table_name'这一行,将your_table_name替换成对应的表名,然后执行即可。
核心原理是通过开启一个新的CDC,使用替换的方法,不需要重启,数据也不丢失。
/*
表新加字段可以通过下面5个步骤实现
1、新开启一个临时捕捉实例
2、CT表增加字段
3、把新加字段写到cdc.captured_columns
4、修改cdc的存储过程,用临时捕捉实例的替换
cdc.sp_batchinsert_<objectid>
cdc.sp_insdel_<objectid>
cdc.sp_upd_<objectid>
5、删掉临时捕捉实例
*/
SET NOCOUNT ON;
DECLARE @Tibble NVARCHAR(30) = '$tmp$'; /*临时CT表*/
DECLARE @ObjectID INT;
DECLARE @MaxColumnID INT;
DECLARE @ErrorMessage NVARCHAR(MAX);
DECLARE @SchemaName NVARCHAR(128);
DECLARE @ObjectName NVARCHAR(128);
DECLARE @CaptureInstance NVARCHAR(128);
DECLARE @TmpCaptureInstance NVARCHAR(128);
DECLARE @ColumnName NVARCHAR(128);
DECLARE @ColumnID INT;
DECLARE @ColumnDataType NVARCHAR(128);
DECLARE @ColumnTypeName NVARCHAR(128);
DECLARE @SQL NVARCHAR(MAX);
DECLARE @ColumnOrdinal INT;
DECLARE @IsComputed BIT;
DECLARE @OriginalCDCObjectID INT;
DECLARE @NewCDCObjectID INT;DECLARE @TableName NVARCHAR(200) = 'your_table_name' -- 指定表/* 当前DB是否有CT表 */
IF EXISTS(SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('cdc.change_tables'))
BEGIN/* 游标删除临时CT表,也许是上次执行留下的 */DECLARE TmpCDCObjects CURSOR LOCAL FAST_FORWARD FORSELECT OBJECT_SCHEMA_NAME(source_object_id),OBJECT_NAME(source_object_id),capture_instanceFROM cdc.change_tablesWHERE capture_instance LIKE @Tibble + '%'AND OBJECT_NAME(source_object_id) = @TableName;OPEN TmpCDCObjects;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;WHILE (@@FETCH_STATUS = 0)BEGIN/* 删除临时CT表,即禁用 CDC capture instance */EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @CaptureInstance;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;ENDCLOSE TmpCDCObjects;/* 如果一张表启用 CDC capture instance 超过1个,则中断操作!*/IF EXISTS(SELECT source_object_id, COUNT(*) FROM cdc.change_tables GROUP BY source_object_id HAVING COUNT(*) > 1)BEGINDEALLOCATE TmpCDCObjects;SELECT @ErrorMessage = 'Unable to update CDC as there are objects with more than one capture instance in use. You must update CDC manually';THROW 50000, @ErrorMEssage, 1;END;/* Loop over all the CDC tables that do not have the latest columns *//* 遍历所有新字段cdc表做相关处理 */DECLARE CDCObjects CURSOR LOCAL FAST_FORWARD FORSELECT ct.object_id,ct.source_object_id,MAX(cc.column_id) AS MaxCapturedColumnIDFROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_idWHERE OBJECT_NAME(source_object_id) = @TableNameGROUP BY ct.source_object_id,ct.object_idHAVING MAX(cc.column_id) < (SELECT MAX(column_id) FROM sys.columns AS c WHERE c.object_id = ct.source_object_id);OPEN CDCObjects;FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;WHILE (@@FETCH_STATUS = 0)BEGINSELECT@SchemaName = OBJECT_SCHEMA_NAME(@ObjectID),@ObjectName = OBJECT_NAME(@ObjectID),@CaptureInstance = (SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = @ObjectID ORDER BY create_date);SELECT @TmpCaptureInstance = @Tibble + '_' + @CaptureInstance/* 新增临时 CDC capture instance , 使用一个临时名字*/EXEC sys.sp_cdc_enable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance, @role_name = 'cdc_Admin';/* 新增字段到原 CT 表 */DECLARE cColumns CURSOR LOCAL FAST_FORWARD FORWITH LastColumn AS (/* 获取所有CT表没有的最新字段 */SELECT ct.source_object_id,MAX(column_id) AS MaxColumnIDFROM cdc.captured_columns AS cc INNER JOIN cdc.change_tables AS ct ON cc.object_id = ct.object_idWHERE cc.object_id = @OriginalCDCObjectIDGROUP BY ct.source_object_id) SELECT c.name,c.column_id ,CASEWHEN t.name IN ('datetime2', 'varchar', 'char', 'binary', 'varbinary', 'float') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS NVARCHAR(20)) END + ')'WHEN t.name IN ('nchar', 'nvarchar') THEN t.name + '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS NVARCHAR(20)) END + ')'WHEN t.name IN ('decimal', 'numeric') THEN t.name+ '(' + CAST(c.precision AS NVARCHAR(20)) + ', ' + CAST(c.scale AS NVARCHAR(20)) + ')'ELSE t.name END AS DataType,t.name AS TypeName,c.is_computedFROM sys.columns AS cINNER JOIN LastColumn AS lc ON c.object_id = lc.source_object_idINNER JOIN sys.types AS t ON c.user_type_id = t.user_type_idWHERE c.column_id > lc.MaxColumnID;OPEN cColumns;FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputed;/* 查找字段最大序号 */SELECT @ColumnOrdinal = MAX(column_ordinal)FROM cdc.captured_columnsWHERE [object_id] = @OriginalCDCObjectID;WHILE (@@FETCH_STATUS = 0)BEGIN;SELECT @ColumnOrdinal += 1; /* Increment the column ordinal */--SELECT @CaptureInstance, @ColumnName, @ColumnDataType, @ColumnOrdinalSELECT @SQL = 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(@OriginalCDCObjectID)) + '.' + QUOTENAME(OBJECT_NAME(@OriginalCDCObjectID)) + ' ADD [' + @ColumnName + '] ' + @ColumnDataType + ' NULL';PRINT @SQL;EXEC(@SQL);/* 更新字段信息到元数据表 cdc.captured_colums */INSERT INTO cdc.captured_columns([object_id], column_name, column_id, column_type, column_ordinal, is_computed)VALUES(@OriginalCDCObjectID, @ColumnName, @ColumnID, @ColumnTypeName, @ColumnOrdinal, @IsComputed);FETCH NEXT FROM cColumns INTO @ColumnName, @ColumnID, @ColumnDataType, @ColumnTypeName, @IsComputedEND;/* 找到刚更新的对象ID */SELECT @NewCDCObjectID = [object_id]FROM cdc.change_tablesWHERE source_object_id = @ObjectID AND object_id != @OriginalCDCObjectID/* batch insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_batchinsert_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* insdel insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_insdel_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* upd insert proc */SELECT @SQL = STUFF(OBJECT_DEFINITION(OBJECT_ID('cdc.sp_upd_' + CAST(@NewCDCObjectID AS NVARCHAR(20)))), 1, 6, 'alter');SELECT @SQL = REPLACE(@SQL, CAST(@NewCDCObjectID AS NVARCHAR(20)), CAST(@OriginalCDCObjectID AS NVARCHAR(20)));SELECT @SQL = REPLACE(@SQL, @TmpCaptureInstance, @CaptureInstance);EXEC(@SQL);/* 下一张表对象 */FETCH NEXT FROM CDCObjects INTO @OriginalCDCObjectID, @ObjectID, @MaxColumnID;END;CLOSE CDCObjects;DEALLOCATE CDCObjects;/* 删除临时CT表 */OPEN TmpCDCObjects;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;WHILE (@@FETCH_STATUS = 0)BEGIN;/* 移除 CDC capture instance */EXEC sys.sp_cdc_disable_table @source_schema = @SchemaName, @source_name = @ObjectName, @capture_instance = @TmpCaptureInstance;FETCH NEXT FROM TmpCDCObjects INTO @SchemaName, @ObjectName, @CaptureInstance;END;CLOSE TmpCDCObjects;DEALLOCATE TmpCDCObjects;
END;
SET NOCOUNT OFF;