[SQL SERVER][SSIS]利用CDC實現資料差異增量更新
在ETL資料轉換過程中,為了改善效能和縮短ETL執行時間,
大部分都會利用CDC特性只更新從上次請求後有異動的資料,
前些日子User希望B資料庫中的Userm資料表可以和A資料庫Userm資料表同步,
但又不行清空B資料庫中的Userm資料表資料後再匯入A資料庫Userm資料表資料,
這時只好開啟CDC並搭配SSIS來完成該需求了。
已開啟A資料庫中的Userm資料表 CDC功能(可以參考這篇)
零、開啟Userm資料表change tracking
exec sys.sp_cdc_enable_table
@source_schema='dbo',
@source_name='userm',
@role_name='cdc_admin',
@capture_instance='dbo_userm',
@supports_net_changes=1
一、建立cdc紀錄資料表
create table dbo.cdc_capture_log (
cdc_capture_log_id int identity not null
, capture_instance nvarchar(50) not null
, start_time datetime not null
, min_lsn binary(10) not null
, max_lsn binary(10) not null
, end_time datetime null
, status_code int not null default 0)
capture_instance:資料表開啟CDC所指定的值。
start_time、end_time:紀錄執行所花的時間。
min_lsn、max_lsn:表示CDC記錄每次更改LSN的範圍。
status_code:當SSIS成功完成時,status_code=1。
二、create procedure dbo.usp_init_cdc_capture_log。
create procedure dbo.usp_init_cdc_capture_log
@capture_instance nvarchar(50)
as
begin
set nocount on;
declare
@start_lsn binary(10)
, @end_lsn binary(10)
, @prev_max_lsn binary(10)
--get the max LSN for the capture instance from
--the last extract
select @prev_max_lsn = max(max_lsn)
from dbo.cdc_capture_log
where capture_instance = @capture_instance
-- if no row found in cdc_capture_log get the min lsn
-- for the capture instance
if @prev_max_lsn is null
set @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
else
set @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)
-- get the max lsn
set @end_lsn = sys.fn_cdc_get_max_lsn()
insert into dbo.cdc_capture_log
(capture_instance,start_time,min_lsn,max_lsn)
values
(@capture_instance,getdate(),@start_lsn,@end_lsn)
select cast(scope_identity() as int) cdc_capture_log_id
end
該SP幫助我們取得上次所執行的max_lsn,如果有找到資料就呼叫sys.fn.cdc_increment_lsn並設定@start_lsn,
否則就呼叫sys.fn_cdc_get_min_lsn並取得lsn,最後會返回cdc_capture_log_id,
後續將利用cdc_capture_log_id來執行相關資料更新。
create procedure dbo.usp_end_cdc_capture_log
create procedure dbo.usp_end_cdc_capture_log
@cdc_capture_log_id int
as
begin
set nocount on;
update dbo.cdc_capture_log set
end_time = getdate(),
status_code = 1
where cdc_capture_log_id = @cdc_capture_log_id
end
該SP幫助我們更新cdc_capture_log資料表的結束時間和狀態。
create procedure dbo.usp_extract_userm_capture_log
create procedure dbo.usp_extract_userm_capture_log
@cdc_capture_log_id int
as
begin
set nocount on;
declare
@start_lsn binary(10)
,@end_lsn binary(10)
-- get the lsn range to process
select
@start_lsn = min_lsn
,@end_lsn = max_lsn
from dbo.cdc_capture_log
where cdc_capture_log_id = @cdc_capture_log_id
-- extract and return the changes
select m.tran_end_time modified_ts, x.*
from cdc.fn_cdc_get_net_changes_dbo_userm (
@start_lsn, @end_lsn, 'all'
) x
join cdc.lsn_time_mapping m
on m.start_lsn = x.__$start_lsn ;
end
該SP幫助我們取得lsn範圍(透過@cdc_capture_log_id查詢cdc_capture_log資料表),
並透過呼叫cdc.fn_cdc_get_net_changes_dbo_userm和cdc.lsn_time_mapping來取得lsn範圍中發生的所有資料變更。
三、設計SSIS控制流程和資料流程
編輯Exec usp_init_cdc_capture_log
編輯sqlstatement:exec dbo.usp_init_cdc_capture_log 'dbo_userm'。
編輯結果集:選擇User::capture_log_id變數(請先建立該變數)。
編輯資料流程
編輯OLE DB來源
請記得設定參數和變數的對應。
編輯條件式分割
4:Update。2:Insert。1:Delete。
編輯update(oledb命令)
請小心參數和資料行的對應。
編輯insert
這裡就直接設定目的地資料庫的資料表。
開始測試
來源資料表。
執行delete、insert、update
delete ASSET.dbo.userm where NAME in ('rico1','rico2','rico3');
insert into ASSET.dbo.userm
select 'ricoisme1',101,'02-28825252' union all
select 'ricoisme2',101,'02-28825252' union all
select 'ricoisme3',101,'02-28825252'
update ASSET.dbo.userm
set ZIPCODE=119
where NAME='ricoisme2'
delete ASSET.dbo.userm where NAME='rico'
執行SSIS
確認兩邊資料表是否相同
資料相同。
在一次重複如上操作
刪除資料
刪除一筆資料。
執行SSIS
果然只有一筆刪除資料。
確認兩邊資料表是否相同
資料相同。
參考
How To Process Change Data Capture (CDC) in SQL Server Integration Services SSIS 2008