[SQL SERVER][SSIS]利用CDC實現資料差異增量更新

[SQL SERVER][SSIS]利用CDC實現資料差異增量更新

在ETL資料轉換過程中,為了改善效能和縮短ETL執行時間,

大部分都會利用CDC特性只更新從上次請求後有異動的資料,

前些日子User希望B資料庫中的Userm資料表可以和A資料庫Userm資料表同步,

但又不行清空B資料庫中的Userm資料表資料後再匯入A資料庫Userm資料表資料,

這時只好開啟CDC並搭配SSIS來完成該需求了。

 

已開啟A資料庫中的Userm資料表 CDC功能(可以參考這篇)

零、開啟Userm資料表change tracking

exec sys.sp_cdc_enable_table
 @source_schema='dbo',
 @source_name='userm',
 @role_name='cdc_admin',
 @capture_instance='dbo_userm',
 @supports_net_changes=1

 

 

 

 

 

一、建立cdc紀錄資料表

create table dbo.cdc_capture_log ( 
cdc_capture_log_id int identity not null
, capture_instance nvarchar(50) not null
, start_time datetime not null
, min_lsn binary(10) not null
, max_lsn binary(10) not null
, end_time datetime null 
, status_code int not null default 0)

 

 

 

 

capture_instance:資料表開啟CDC所指定的值。

start_time、end_time:紀錄執行所花的時間。

min_lsn、max_lsn:表示CDC記錄每次更改LSN的範圍。

status_code:當SSIS成功完成時,status_code=1。

 

二、create procedure dbo.usp_init_cdc_capture_log。

create procedure dbo.usp_init_cdc_capture_log 
@capture_instance nvarchar(50)
as 
begin 
set nocount on; 
declare 
@start_lsn binary(10)
, @end_lsn binary(10)
, @prev_max_lsn binary(10)
--get the max LSN for the capture instance from 
--the last extract
select @prev_max_lsn = max(max_lsn)
from dbo.cdc_capture_log 
where capture_instance = @capture_instance 
-- if no row found in cdc_capture_log get the min lsn 
-- for the capture instance 
if @prev_max_lsn is null
set @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
else 
set @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn) 
-- get the max lsn 
set @end_lsn = sys.fn_cdc_get_max_lsn()
insert into dbo.cdc_capture_log 
(capture_instance,start_time,min_lsn,max_lsn)
values 
(@capture_instance,getdate(),@start_lsn,@end_lsn) 
select cast(scope_identity() as int) cdc_capture_log_id 
end 

 

 

 

 

該SP幫助我們取得上次所執行的max_lsn,如果有找到資料就呼叫sys.fn.cdc_increment_lsn並設定@start_lsn,

否則就呼叫sys.fn_cdc_get_min_lsn並取得lsn,最後會返回cdc_capture_log_id,

後續將利用cdc_capture_log_id來執行相關資料更新。

 

create procedure dbo.usp_end_cdc_capture_log

create procedure dbo.usp_end_cdc_capture_log
@cdc_capture_log_id int
as
begin
set nocount on;
update dbo.cdc_capture_log set
end_time = getdate(), 
status_code = 1
where cdc_capture_log_id = @cdc_capture_log_id
end

 

 

 

 

該SP幫助我們更新cdc_capture_log資料表的結束時間和狀態。

 

create procedure dbo.usp_extract_userm_capture_log

create procedure dbo.usp_extract_userm_capture_log 
@cdc_capture_log_id int
as
begin
set nocount on;
declare
@start_lsn binary(10)
,@end_lsn binary(10)
-- get the lsn range to process
select
@start_lsn = min_lsn
,@end_lsn = max_lsn
from dbo.cdc_capture_log
where cdc_capture_log_id = @cdc_capture_log_id
-- extract and return the changes
select m.tran_end_time modified_ts, x.*
from cdc.fn_cdc_get_net_changes_dbo_userm (
@start_lsn, @end_lsn, 'all'
) x
join cdc.lsn_time_mapping m
on m.start_lsn = x.__$start_lsn ;
end

 

 

 

 

 

該SP幫助我們取得lsn範圍(透過@cdc_capture_log_id查詢cdc_capture_log資料表),

並透過呼叫cdc.fn_cdc_get_net_changes_dbo_userm和cdc.lsn_time_mapping來取得lsn範圍中發生的所有資料變更。

 

三、設計SSIS控制流程和資料流程

image

 

編輯Exec usp_init_cdc_capture_log

image

image

編輯sqlstatement:exec dbo.usp_init_cdc_capture_log 'dbo_userm'。

 

image

image

編輯結果集:選擇User::capture_log_id變數(請先建立該變數)。

 

編輯資料流程

image

 

編輯OLE DB來源

image image

請記得設定參數和變數的對應。

 

編輯條件式分割

image

4:Update。2:Insert。1:Delete。

 

編輯update(oledb命令)

image

image

請小心參數和資料行的對應。

 

編輯insert

image

這裡就直接設定目的地資料庫的資料表。

 

開始測試

image

來源資料表。

 

執行delete、insert、update

delete ASSET.dbo.userm where NAME in ('rico1','rico2','rico3');

		
insert into ASSET.dbo.userm 
select 'ricoisme1',101,'02-28825252' union all
select 'ricoisme2',101,'02-28825252' union all
select 'ricoisme3',101,'02-28825252' 

		
update ASSET.dbo.userm 
set ZIPCODE=119
where NAME='ricoisme2'

		
delete ASSET.dbo.userm where NAME='rico'

 

image

 

執行SSIS

image

image

 

確認兩邊資料表是否相同

image

資料相同。

 

在一次重複如上操作

刪除資料

image

刪除一筆資料。

 

執行SSIS

image

image

果然只有一筆刪除資料。

 

確認兩邊資料表是否相同

image

資料相同。

 

參考

利用異動資料擷取改善累加式載入

How To Process Change Data Capture (CDC) in SQL Server Integration Services SSIS 2008