1. 概述
1.1 应用场景
用户的管道任务实现上百张表的实时同步,且存在部分来源表和目标表的表名不同、多张来源表同步到一张目标表等情况,导致用户在管理来源表和目标表的对应关系时(例如重构任务)较为困难。
1.2 实现思路
使用 fine_dp_pipeline_task 表,获取管道任务中包含的来源表及对应的目标表信息:
注:4.1.8.1 及之后版本,namespace 为 PipelineSourceTable 的派生表更名为 PipelineSourceTableV1;namespace 为 PipelineTargetTable 的派生表更名为 PipelineTargetTableV1。
namesapce 为 PipelineSourceTableV1 时,包含着源表信息。
namesapce 为 PipelineTargetTableV1 时,包含着目标表信息。
namesapce 为 DPPipelineFileEntityStore 时,包含着管道任务信息。
通过目标表id字段将源表信息、目标表信息关联,再通过管道任务id字段将管道任务信息与之关联。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「导出管道任务中来源表及对应的目标表信息」。
2. 操作步骤
本文示例中,FineDB 数据已迁移到 MySQL 外接数据库中。
建议用户先了解下 fine_dp_pipeline_task 表结构与字段含义。
2.1 获取来源表表名、目标表id、管道任务id
读取 fine_dp_pipeline_task 表数据,过滤出 namesapce 为 PipelineSourceTableV1 的数据(源表信息数据),解析 entity_value 字段,得到来源表名、目标表id、来源表id字段;拆分目标表id字段,获取管道任务id字段,便于后续使用管道任务id字段进行数据关联。
2.1.1 获取来源表表名、目标表id
1)新建定时任务,拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,读取 fine_dp_pipeline_task 表数据。如下图所示:
3)拖入「数据过滤」算子,过滤出 namesapce 为 PipelineSourceTableV1 的数据,获取管道任务中来源表信息。如下图所示:
注:此为 4.1.8.1 及之后版本步骤,4.1.8.1 之前版本,应该过滤出 namesapce 为 PipelineSourceTable 的数据。
点击「数据预览」,如下图所示:
来源表信息在 entity_value 字段中,需要进行解析。
4)拖入「JSON解析」算子,解析 entity_value 字段,获取源表id、对应目标表 id、源表名称数据。如下图所示:
点击「数据预览」,如下图所示:
targetTableId(目标表 id)数据,是由管道任务id和加密后的目标表名由_拼接而成。由于后续需要使用管道任务id来关联数据,所以需要将管道任务id拆分出来。
2.1.2 通过拆分目标表id获取管道任务id
1)拖入「字段拆列」算子,将 targetTableId 字段根据_拆分为两列。如下图所示:
2)拖入「字段设置」算子,修改字段名称。如下图所示:
id 更名为来源表ID。
targetTableId更名为目标表ID。
table更名为来源表名。
targetTableId_1更名为管道任务ID。
2.2 获取目标表名及id
读取 fine_dp_pipeline_task 表数据,过滤出 namesapce 为 PipelineTargetTableV1 的数据(目标表信息数据),解析 entity_value 字段,得到目标表名称、目标表id字段。
1)拖入「DB表输入」算子,读取 fine_dp_pipeline_task 表数据。如下图所示:
2)拖入「数据过滤」算子,过滤出 namesapce 为 PipelineTargetTableV1 的数据,获取管道任务中目标表信息。如下图所示:
注:此为 4.1.8.1 及之后版本步骤,4.1.8.1 之前版本,应该过滤出 namesapce 为 PipelineTargetTable 的数据。
目标表信息在 entity_value 字段中,需要进行解析。
3)拖入「JSON解析」算子,解析 entity_value 字段,获取目标表id、目标库名、目标模式名、目标表名称数据。如下图所示:
点击「数据预览」,如下图所示:
4)拖入「字段设置」算子,修改字段名称、删除无用字段。如下图所示:
id更名为目标表id。
table更名为目标表名称。
删除database、schema字段。
2.3 关联来源表数据与目标表数据
可以使用目标表id字段,关联 2.1 节和 2.2 节数据。
1)拖入「数据关联」算子,使用目标表id字段,关联 2.1 节和 2.2 节数据。如下图所示:
2)拖入「字段设置」算子,删除目标表id1、targetTableId_2字段。如下图所示:
2.4 获取管道任务名称
读取 fine_dp_pipeline_task 表数据,过滤出 namesapce 为 DPPipelineFileEntityStore 的数据(管道任务信息),解析 entity_value 字段,得到id、name、type字段;过滤出 type 字段值为 ENTITY 的数据,得到管道任务id、管道任务名称、节点类型字段。
1)拖入「DB表输入」算子,读取 fine_dp_pipeline_task 表数据。如下图所示:
2)拖入「数据过滤」算子,过滤出 namesapce 为 DPPipelineFileEntityStore 的数据。如下图所示:
管道任务名称保存在 entity_value 字段中,需要解析 entity_value 字段。
3)拖入「JSON解析」算子,解析 entity_value 字段,获取id、name、type字段。如下图所示:
当 type 值为 PACKAGE 时,代表文件夹,值为 ENTITY 时,代表管道任务。所以我们需要过滤出 type 值为 ENTITY 的数据。
4)拖入「数据过滤」算子,过滤出 type 值为 ENTITY 的数据。如下图所示:
5)拖入「字段设置」算子,将 name 字段更名为管道任务名、id字段更名为管道任务id、删除 type 字段。如下图所示:
6)由于管道任务id并不能判断数据唯一(可能存在重复),需要将数据分组汇总下。拖入「Spark SQL」算子,将数据按照管道任务id、管道任务名分组汇总。如下图所示:
2.5 关联数据
将 2.3 节、2.4节的数据关联起来。
1)拖入「数据关联」算子,「连接字段」选择管道任务id,将 2.3 节、2.4节的数据关联起来。如下图所示:
2)拖入「DB表输出」算子,将最终数据输出。如下图所示:
写入方式选择「追加写入数据」。
2.6 效果查看
1)运行定时任务成功后,日志界面如下图所示:
数据库数据如下所示:
用户可使用该表数据,在FR/BI中做模板,通过过滤「管道任务名」查看管道任务对应的来源表、目标表。
2)用户可点击「发布」按钮,将定时任务发布到 生产模式 。如下图所示: