1. 概述
1.1 問題描述
使用者在定時任務進行資料同步時,如果源資料庫同時有增刪改變化,且來源表有時間戳時,可以使用数据比对,但是如果資料量很大,想要更好的提高同步效率,就可以使用本文的方案進行資料同步。
注:時間戳一般是記錄資料的增刪改時間。
1.2 實現思路
透過兩次資料更新的時間進行增刪改的同步。
1)同步刪除資料:透過比對主鍵,刪除來源表中被物理刪除但是目標表中還存在的資料。
2)使用時間戳同步新增和修改的資料:
獲取上次同步的時候,來源表的最大時間戳,並設定為參數 cur_timestamp。
獲取這次同步的時候,來源表的最大時間戳,並設定為參數 tmp_timestamp。
獲取來源表相對目標表的增量資料,即 order_timestamp(來源表業務時間戳)>cur_timestamp 的資料,並將增加和更新的資料同步到目標表。
3)更新時間表同步時間戳和執行記錄。
demo 範例詳情請參見:https://demo.finedatalink.com/ 增刪改資料同步(非資料比對且有時間戳)
2. 操作步驟
例如來源表中刪除 1002 的資料,同時修改了 1003 的 order_quantiny 的資料,同時新增了 1005 的資料,需要將變化同步至目標表。
其中 order_business_date 為業務時間,也就是訂單交易時間,order_timestamp 為時間戳,即記錄資料增刪改的時間。
2.1 前提條件
建立執行記錄資料表,便於獲取每次同步的時間戳,如下圖所示:
注:兩個時間欄位初始值可以設定為一個比較早的時間,早於記錄資料更新時間戳,以便初次同步全量資料。
2.2 同步來源表中刪除的資料
透過比對主鍵,刪除來源表中被物理刪除但是目標表中還存在的資料。
建立定時任務後,使用「資料轉換」節點,進入編輯介面,使用DB表輸入功能,獲取來源表中主鍵 order_id_from,如下圖所示:
SELECT order_id AS order_id_from
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
使用DB匯出算子,獲取目標資料表的主鍵,如下圖所示:
SELECT order_id AS order_id_target
FROM `demotest`.`F_ORDER_TIMESTAMP_TARGET`
使用「資料比對」算子,比對來源表和目標表的 order_id 主鍵,獲取來源表刪除但是目標表還存在的資料,如下圖所示:
查看比對結果即可看到來源端刪除的資料 1002 被標識出來,如下圖所示:
使用 DB 匯出算子將比對結果源表刪除的資料從目標表中刪除,設定寫入方式為「插入更新刪除資料」,只勾選「刪除」,邏輯主鍵選擇「order_id」,如下圖所示:
2.3 使用時間戳同步新增和修改的資料
使用「參數指派」獲取上次同步的時候,來源表記錄資料增刪改的最大時間戳,如下圖所示:
SELECT cur_timestamp
FROM `demotest`.`ETL_RECORD`
WHERE table_name='F_ORDER_TIMESTAMP_FROM'
並設定為參數 cur_timestamp,如下圖所示:
獲取這次同步的時候,來源表的最大時間戳,如下圖所示:
SELECT MAX(order_timestamp) AS tmp_timestamp
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
並設定為參數 tmp_timestamp,如下圖所示:
獲取來源表相對目標表的增量資料,即 order_timestamp(源表記錄資料增刪改的時間戳)>cur_timestamp 的資料,並將增量資料同步到目標表。
新增資料轉換節點,進入編輯介面,使用DB輸入算子,取出增量資料並將當前的時間作為 etl_time,便於後續寫入 2.1 節的記錄資料表,如下圖所示:
SELECT
*,
NOW() AS etl_time
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
WHERE order_timestamp>'${cur_timestamp}'
將 order_timestamp(來源表業務時間戳)>cur_timestamp 的增加和更新的資料同步到目標表,如下圖所示:
2.4 更新時間表同步時間戳和執行記錄
為了便於下一次使執行任務時使用時間戳,因此需要將本次同步時的最大時間戳寫入 2.1 節建立的執行記錄表中,同時將上次同步記錄時間寫入 pre_timestamp 欄位,etl執行次數寫入 etl_time,如下圖所示:如下圖所示:
UPDATE `demotest`.`ETL_RECORD`
SET
cur_timestamp='${tmp_timestamp}',
pre_timestamp='${cur_timestamp}',
etl_times=etl_times+1,
etl_last_time=NOW()
WHERE table_name='F_ORDER_TIMESTAMP_FROM'
2.5 效果查看
儲存並運作任務後,即可在資料庫中看到增刪改資料同步到目標表,如下圖所示:
同時記錄表 ETL_RECORD 結果如下圖所示: