匯出管道任務中來源表及對應的目標表資訊

  • 文档创建者:Wendy123456
  • 历史版本:4
  • 最近更新:Nikozhan 于 2025-04-06
  • 1. 概述

    1.1 應用場景

    使用者的管道任務實現上百張表的實時同步,且存在部分來源表和目標表的表名不同、多張來源表同步到一張目標表情況,導致使用者在管理來源表和目標表的對應關係時(例如重構任務)較為困難。

    1.2 實現思路

    使用 fine_dp_pipeline_task 表,獲取管道任務中包含的來源表及對應的目標表資訊:

    注:4.1.8.1 及之後版本,namespace 為 PipelineSourceTable 的派生表更名為 PipelineSourceTableV1;namespace 為 PipelineTargetTable 的派生表更名為 PipelineTargetTableV1。

    • namesapce 為 PipelineSourceTableV1 時,包含着源表資訊。

    • namesapce 為 PipelineTargetTableV1 時,包含着目標表資訊。

    • namesapce 為 DPPipelineFileEntityStore 時,包含着管道任務資訊。

    透過目標表id欄位將源表資訊、目標表資訊聯動,再透過管道任務id欄位將管道任務資訊與之聯動。

    1.3 任務展示

    FineDataLink 中的資料處理程式,詳情參見:https://demo.finedatalink.com/ 「匯出管道任務中來源表及對應的目標表資訊」。

    2. 操作步驟

    本文範例中,FineDB 資料已遷移到 MySQL 外接資料庫中。

    建議使用者先了解下 fine_dp_pipeline_task 表結構與欄位含義。

    2.1 獲取來源表表名、目標表id、管道任務id

    讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 PipelineSourceTableV1 的資料(源表資訊資料),解析 entity_value 欄位,得到來源表名目標表id來源表id欄位;拆分目標表id欄位,獲取管道任務id欄位,便於後續使用管道任務id欄位進行資料聯動。

    2.1.1 獲取來源表表名、目標表id

    1)建立定時任務,拖入「資料轉換」節點,進入「資料轉換」節點。

    2)拖入DB表輸入算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:

    3)拖入「資料過濾」算子,過濾出 namesapce 為 PipelineSourceTableV1 的資料,獲取管道任務中來源表資訊。如下圖所示:

    注:此為 4.1.8.1 及之後版本步驟,4.1.8.1 之前版本,應該過濾出 namesapce 為 PipelineSourceTable 的資料。

    點選「資料預覽」,如下圖所示:

    來源表資訊在 entity_value 欄位中,需要進行解析。

    4)拖入JSON解析算子,解析 entity_value 欄位,獲取源表id、對應目標表 id、源表名稱資料。如下圖所示:

    點選資料預覽,如下圖所示:

    targetTableId(目標表 id)資料,是由管道任務id加密後的目標表名由_連接而成。由於後續需要使用管道任務id來聯動資料,所以需要將管道任務id拆分出來。

    2.1.2 透過拆分目標表id獲取管道任務id

    1)拖入「欄位拆列」算子,將 targetTableId 欄位根據_拆分為兩列。如下圖所示:

    2)拖入欄位設定算子,修改欄位名稱。如下圖所示:

    • id 更名為來源表ID

    • targetTableId更名為目標表ID

    • table更名為來源表名

    • targetTableId_1更名為管道任務ID

    2.2 獲取目標表名及id

    讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 PipelineTargetTableV1 的資料(目標表資訊資料),解析 entity_value 欄位,得到目標表名稱目標表id欄位。

    1)拖入DB表輸入算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:

    2)拖入「資料過濾」算子,過濾出 namesapce 為 PipelineTargetTableV1 的資料,獲取管道任務中目標表資訊。如下圖所示:

    注:此為 4.1.8.1 及之後版本步驟,4.1.8.1 之前版本,應該過濾出 namesapce 為 PipelineTargetTable 的資料。

    目標表資訊在 entity_value 欄位中,需要進行解析。

    3)拖入JSON解析算子,解析 entity_value 欄位,獲取目標表id、目標庫名目標模式名、目標表名稱資料。如下圖所示:

    點選資料預覽,如下圖所示:

    4)拖入欄位設定算子,修改欄位名稱、刪除無用欄位。如下圖所示:

    • id更名為目標表id

    • table更名為目標表名稱

    • 刪除database、schema欄位。

    2.3 聯動來源表資料與目標表資料

    可以使用目標表id欄位,聯動 2.1 節和 2.2 節資料。

    1)拖入「資料聯動」算子,使用目標表id欄位,聯動 2.1 節和 2.2 節資料。如下圖所示:

    2)拖入欄位設定算子,刪除目標表id1targetTableId_2欄位。如下圖所示:

    2.4 獲取管道任務名稱

    1722309622991707.png

    讀取 fine_dp_pipeline_task 表資料,過濾出 namesapce 為 DPPipelineFileEntityStore 的資料(管道任務資訊),解析 entity_value 欄位,得到id、name、type欄位;過濾出 type 欄位值為 ENTITY 的資料,得到管道任務id管道任務名稱節點類型欄位。

    1)拖入DB表輸入算子,讀取 fine_dp_pipeline_task 表資料。如下圖所示:

    19.png

    2)拖入「資料過濾」算子,過濾出 namesapce 為 DPPipelineFileEntityStore 的資料。如下圖所示:

    20.png

    管道任務名稱儲存在 entity_value 欄位中,需要解析 entity_value 欄位。

    3)拖入JSON解析算子,解析 entity_value 欄位,獲取id、name、type欄位。如下圖所示:

    21.png

    當 type 值為 PACKAGE 時,代表資料夾,值為 ENTITY 時,代表管道任務。所以我們需要過濾出 type 值為 ENTITY 的資料。

    4)拖入「資料過濾」算子,過濾出 type 值為 ENTITY 的資料。如下圖所示:

    22.png

    5)拖入欄位設定算子,將 name 欄位更名為管道任務名、id欄位更名為管道任務id、刪除 type 欄位。如下圖所示:

    2.png

    6)由於管道任務id並不能判斷資料唯一(可能存在重複),需要將資料分組匯總下。拖入「Spark SQL」算子,將資料按照管道任務id管道任務名分組匯總。如下圖所示:

    3.png

    2.5 聯動資料

    將 2.3 節、2.4節的資料聯動起來。

    1)拖入「資料聯動」算子,連結欄位選擇管道任務id將 2.3 節、2.4節的資料聯動起來。如下圖所示:

    26.png

    2)拖入「DB表匯出」算子,將最終資料匯出。如下圖所示:

    10.png

    寫入方式選擇追加寫入資料

    2.6 效果查看

    1)運作定時任務成功後,日誌介面如下圖所示:

    11.png

    資料庫資料如下所示:

    25.png

    使用者可使用該表資料,在FR/BI中做範本,透過過濾管道任務名查看管道任務對應的來源表、目標表。

    2)使用者可點選「發佈」按鈕,將定時任務發佈到 生產模式 。如下圖所示:

    11.png






    附件列表


    主题: 系統管理
    • 有帮助
    • 没帮助
    • 只是浏览
    • 评价文档,奖励 1 ~ 100 随机 F 豆!