Facilitate the management of mapping relationships between source tables and target tables. (The pipeline task you create may involve real-time synchronization of hundreds of tables, where some source and target tables may have different names and multiple source tables may be synchronized to the same target table, making it difficult to manage the mapping relationships between source and target tables, for example, during task refactoring.)
Using the FINE_DP_PIPELINE_TASK table, retrieve the source tables and corresponding target table information included in the pipeline tasks:
When the NAMESPACE is PipelineSourceTableV1, it contains source table information.
When the NAMESPACE is PipelineTargetTableV1, it contains target table information.
When the NAMESPACE is DPPipelineFileEntityStore, it contains pipeline task information.
Source table information and target table information are associated through the target table ID field. Then, the pipeline task information is linked using the pipeline task ID field.
In this example, FineDB data has been migrated to an external MySQL database.
It is recommended to first familiarize yourself with the structure and field definitions of the FINE_DP_PIPELINE_TASK table.
Read data from the FINE_DP_PIPELINE_TASK table and filter for rows where the NAMESPACE is PipelineSourceTableV1 (source table information). Parse the ENTITY_VALUE field to obtain the values of the source table name, target table ID, and source table ID fields. Split the targetTableId field to retrieve the values of Pipeline_Task_ID, which can be used later for data association.
1. Create a scheduled task, drag a Data Transformation node onto the page, and enter the Data Transformation editing page.
2. Drag a DB Table Input operator and configure it to read data from the FINE_DP_PIPELINE_TASK table, as shown in the following figure.
3. Drag a Data Filtering operator and configure it to filter rows where the NAMESPACE is PipelineSourceTableV1 to obtain source table information in the pipeline task, as shown in the following figure.
Note:
Click Data Preview, as shown in the following figure.
The source table information is in the ENTITY_VALUE field and needs to be parsed.
4. Drag a JSON Parsing operator and configure it to parse the ENTITY_VALUE field and obtain the source table ID, corresponding target table ID, and source table name data, as shown in the following figure.
The targetTableId (target table ID) field is composed of the pipeline task ID and the encrypted target table name, concatenated with an underscore (_). Since the pipeline task ID will be needed for data association later, it must be split out.
1. Drag a Field-to-Column Splitting operator and configure it to split the targetTableId field into two columns based on the underscore (_), as shown in the following figure.
2. Drag a Field Setting operator and configure it to rename the fields, as shown in the following figure.
Rename ID to Source_Table_ID.
Rename targetTableId to Target_Table_ID.
Rename table to Source_Table_Name.
Rename targetTableId_1 to Pipeline_Task_ID.
Read data from the FINE_DP_PIPELINE_TASK table and filter for rows where the NAMESPACE is PipelineTargetTableV1 (target table information). Parse the ENTITY_VALUE field to extract the target table name and target table ID.
1. Drag a DB Table Input operator and configure it to read data from the FINE_DP_PIPELINE_TASK table, as shown in the following figure.
2. Drag a Data Filtering operator and configure it to filter rows where the NAMESPACE is PipelineTargetTableV1 to obtain source table information in the pipeline task, as shown in the following figure.
The target table information is stored in the ENTITY_VALUE field and needs to be parsed.
3. Drag a JSON Parsing operator and configure it to parse the ENTITY_VALUE field and obtain the target table ID, target database name, target schema name, and target table name, as shown in the following figure.
4. Drag a Field Setting operator and configure it to rename fields and remove unnecessary fields, as shown in the following figure.
Rename id to Target_Table_ID.
Rename table to Target_Table_Name.
Delete the database and schema fields.
Use the target table ID field to associate data from sections "Retrieving Source Table Name, Target Table ID, and Pipeline Task ID" and "Retrieving Target Table Name and ID".
1. Drag a Data Association operator and configure it to associate the data from the sections "Retrieving Source Table Name, Target Table ID, and Pipeline Task ID" and "Retrieving Target Table Name and ID" using the target table ID field, as shown in the following figure.
2. Drag a Field Setting operator and configure it to delete the Target_Table_ID1 and targetTableId_2 fields, as shown in the following figure.
Read data from the FINE_DP_PIPELINE_TASK table and filter for rows where the NAMESPACE is DPPipelineFileEntityStore (pipeline task information). Parse the ENTITY_VALUE field to extract the id, name, and type fields. Then filter for rows where the type field value is ENTITY to obtain the pipeline task ID, pipeline task name, and node type.
2. Drag a Data Filtering operator and configure it to filter out records where the NAMESPACE is DPPipelineFileEntityStore, as shown in the following figure.
The pipeline task name is stored in the ENTITY_VALUE field, which needs to be parsed.
3. Drag a JSON Parsing operator and configure it to parse the ENTITY_VALUE field and obtain the ID, name, and type, as shown in the following figure.
When the type value is PACKAGE, it represents a folder; when the value is ENTITY, it represents a pipeline task. Therefore, you need to filter out records where the type value is ENTITY.
4. Drag a Data Filtering operator and configure it to filter out records where the type value is ENTITY, as shown in the following figure.
5. Drag a Field Setting operator, rename the name field to Pipeline_Task_Name, rename the id field to Pipeline_Task_ID, and delete the type field, as shown in the following figure.
6. Because the pipeline task ID may not be unique (duplicates may exist), the data needs to be grouped and aggregated. Drag a Spark SQL operator and configure it to group and aggregate the data by Pipeline_Task_ID and Pipeline_Task_Name, as shown in the following figure.
Associate Data from the “Associating Source Table Data with Target Table Data” Section with the “Retrieving Pipeline Task Names” Section.
1. Drag a Data Association operator and configure it. Select Pipeline_Task_ID as the Join Field to associate the data from the “Associating Source Table Data with Target Table Data” section with the data from the “Retrieving Pipeline Task Names” section, as shown in the following figure.
2. Drag a DB Table Output operator onto the page and configure the operator, as shown in the following figure.
Set Write Method to Write Data into Target Table Directly.
1. After successfully running the scheduled task, the log page is displayed as shown in the following figure.
The table data in the database is shown in the following figure.
You can use this table data in FineReport/FineBI to create templates. By filtering on the "Pipeline Task Name," you can view the corresponding source and target tables for each pipeline task.
2. Click the Publish button to publish the scheduled task to Production Mode, as shown in the following figure.
滑鼠選中內容,快速回饋問題
滑鼠選中存在疑惑的內容,即可快速回饋問題,我們將會跟進處理。
不再提示
10s後關閉
Submitted successfully
Network busy