1. 概述
1.1 應用場景
使用者的管道任務實現上百張表的實時同步,且存在部分來源表和目標表的表名不同、多張來源表同步到一張目標表等情況,導致使用者在管理來源表和目標表的對應關係時(例如重構任務)較為困難。
1.2 實現思路
使用 fine_dp_pipeline_task 表,獲取管道任務中包含的來源表及對應的目標表資訊:
注:4.1.8.1 及之後版本,namespace 為 PipelineSourceTable 的派生表更名為 PipelineSourceTableV1;namespace 為 PipelineTargetTable 的派生表更名為 PipelineTargetTableV1。
namesapce 為 PipelineSourceTableV1 時,包含着源表資訊。
namesapce 為 PipelineTargetTableV1 時,包含着目標表資訊。
namesapce 為 DPPipelineFileEntityStore 時,包含着管道任務資訊。
透過目標表id欄位將源表資訊、目標表資訊聯動,再透過管道任務id欄位將管道任務資訊與之聯動。
1.3 任務展示
FineDataLink 中的資料處理程式,詳情參見:https://demo.finedatalink.com/ 「匯出管道任務中來源表及對應的目標表資訊」。
2. 操作步驟
本文範例中,FineDB 資料已遷移到 MySQL 外接資料庫中。
建議使用者先了解下 fine_dp_pipeline_task 表結構與欄位含義。
2.1 獲取來源表表名、目標表id、管道任務id
讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 PipelineSourceTableV1 的資料(源表資訊資料),解析 entity_value 欄位,得到來源表名、目標表id、來源表id欄位;拆分目標表id欄位,獲取管道任務id欄位,便於後續使用管道任務id欄位進行資料聯動。
2.1.1 獲取來源表表名、目標表id
1)建立定時任務,拖入「資料轉換」節點,進入「資料轉換」節點。
2)拖入「DB表輸入」算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:
3)拖入「資料過濾」算子,過濾出 namesapce 為 PipelineSourceTableV1 的資料,獲取管道任務中來源表資訊。如下圖所示:
注:此為 4.1.8.1 及之後版本步驟,4.1.8.1 之前版本,應該過濾出 namesapce 為 PipelineSourceTable 的資料。
點選「資料預覽」,如下圖所示:
來源表資訊在 entity_value 欄位中,需要進行解析。
4)拖入「JSON解析」算子,解析 entity_value 欄位,獲取源表id、對應目標表 id、源表名稱資料。如下圖所示:
點選「資料預覽」,如下圖所示:
targetTableId(目標表 id)資料,是由管道任務id和加密後的目標表名由_連接而成。由於後續需要使用管道任務id來聯動資料,所以需要將管道任務id拆分出來。
2.1.2 透過拆分目標表id獲取管道任務id
1)拖入「欄位拆列」算子,將 targetTableId 欄位根據_拆分為兩列。如下圖所示:
2)拖入「欄位設定」算子,修改欄位名稱。如下圖所示:
id 更名為來源表ID。
targetTableId更名為目標表ID。
table更名為來源表名。
targetTableId_1更名為管道任務ID。
2.2 獲取目標表名及id
讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 PipelineTargetTableV1 的資料(目標表資訊資料),解析 entity_value 欄位,得到目標表名稱、目標表id欄位。
1)拖入「DB表輸入」算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:
2)拖入「資料過濾」算子,過濾出 namesapce 為 PipelineTargetTableV1 的資料,獲取管道任務中目標表資訊。如下圖所示:
注:此為 4.1.8.1 及之後版本步驟,4.1.8.1 之前版本,應該過濾出 namesapce 為 PipelineTargetTable 的資料。
目標表資訊在 entity_value 欄位中,需要進行解析。
3)拖入「JSON解析」算子,解析 entity_value 欄位,獲取目標表id、目標庫名、目標模式名、目標表名稱資料。如下圖所示:
點選「資料預覽」,如下圖所示:
4)拖入「欄位設定」算子,修改欄位名稱、刪除無用欄位。如下圖所示:
id更名為目標表id。
table更名為目標表名稱。
刪除database、schema欄位。
2.3 聯動來源表資料與目標表資料
可以使用目標表id欄位,聯動 2.1 節和 2.2 節資料。
1)拖入「資料聯動」算子,使用目標表id欄位,聯動 2.1 節和 2.2 節資料。如下圖所示:
2)拖入「欄位設定」算子,刪除目標表id1、targetTableId_2欄位。如下圖所示:
2.4 獲取管道任務名稱
讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 DPPipelineFileEntityStore 的資料(管道任務資訊),解析 entity_value 欄位,得到id、name、type欄位;過濾出 type 欄位值為 ENTITY 的資料,得到管道任務id、管道任務名稱、節點類型欄位。
1)拖入「DB表輸入」算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:
2)拖入「資料過濾」算子,過濾出 namesapce 為 DPPipelineFileEntityStore 的資料。如下圖所示:
管道任務名稱儲存在 entity_value 欄位中,需要解析 entity_value 欄位。
3)拖入「JSON解析」算子,解析 entity_value 欄位,獲取id、name、type欄位。如下圖所示:
當 type 值為 PACKAGE 時,代表資料夾,值為 ENTITY 時,代表管道任務。所以我們需要過濾出 type 值為 ENTITY 的資料。
4)拖入「資料過濾」算子,過濾出 type 值為 ENTITY 的資料。如下圖所示:
5)拖入「欄位設定」算子,將 name 欄位更名為管道任務名、id欄位更名為管道任務id、刪除 type 欄位。如下圖所示:
6)由於管道任務id並不能判斷資料唯一(可能存在重複),需要將資料分組匯總下。拖入「Spark SQL」算子,將資料按照管道任務id、管道任務名分組匯總。如下圖所示:
2.5 聯動資料
將 2.3 節、2.4節的資料聯動起來。
1)拖入「資料聯動」算子,「連結欄位」選擇管道任務id,將 2.3 節、2.4節的資料聯動起來。如下圖所示:
2)拖入「DB表匯出」算子,將最終資料匯出。如下圖所示:
寫入方式選擇「追加寫入資料」。
2.6 效果查看
1)運作定時任務成功後,日誌介面如下圖所示:
資料庫資料如下所示:
使用者可使用該表資料,在FR/BI中做範本,透過過濾「管道任務名」查看管道任務對應的來源表、目標表。
2)使用者可點選「發佈」按鈕,將定時任務發佈到 生產模式 。如下圖所示: