Pipeline Task Example

  • Last update: February 14, 2025
  • Overview

    This document takes the MySQL database as an example and synchronizes the inventoryorder_detail, and each_cost_record tables in the fdl_test database to the mysql database.

    Procedure

    Preparation

    Prepare an independently deployed FineDataLink project with registered function points related to Data Pipeline

    Procedure

    Step One: Data Source Configuration

    Select the source and target databases as needed. For details about databases supported by Data Pipeline, see Types of Data Sources Supported by Data Pipeline.

    Establish data connections to source and target databases in Data Connection Management so that you can configure the source and target databases of pipeline tasks by selecting the data source names. For details, see Data Connection Configuration.

    Step Two: Database Environment Preparation

    Grant the account configured in the data connection used in the pipeline task the necessary permission to perform the required operations on the database. For details, see Overview of Database Environment Preparation.

    Step Three: Pipeline Task Environment Preparation

    Deploy Kafka (an open-source event streaming platform) as the middleware. For details, see Kafka Deployment - ZooKepper Mode and Transmission Queue Configuration.

    Step Four: Pipeline Task Permission Assignment

    Grant the permission to use Data Pipeline to users who are not super admins. For details, see Pipeline Task Management Permission.

    iconNote:
    For MySQL, SQL Server, or Oracle databases with multiple tables needing real-time synchronization, you are advised to use a single pipeline task to synchronize all these tables in the same database to avoid overloading the database.

    Pipeline Task Creation

    Log in to the FineDataLink project, click Data Pipeline, and create a pipeline task. 

    Pipeline Task Configuration

    Source Selection

    Select the data source and the fdl_demotest data connection. Click Data Source Permission Detection on the right. Ensure the account configured in the data connection has permission to read the data source log.

    Set Synchronization Type to Full + Incremental Synchronization, which will synchronize all inventory data first and then continuously synchronize the changes.

    Select the tables order_detailinventory, and each_cost_record from Existing Table and add them to Table to Be Synchronized in Synchronization Object

    iconNote:
    For details about other settings, see Pipeline Task Configuration - Data Source Selection.

    Target Selection

    For details, see Pipeline Task Configuration - Target Selection.

    1. The Target Selection configuration page is shown in the following figure.

    ProcedureMeaning
    Set Database to mysql.Synchronize real-time data to the mysql database.
    Set Data Deletion Strategy to Logical Deletion at Target End.Add a boolean field named _fdl_marked_deleted (whose value defaults to false) to the target table to record the deletion status of data in the source table without actually deleting data in the target table. If a data record is deleted from the source table, the system will not physically delete the corresponding record in the target table after synchronization. Instead, it will change the _fdl_marked_deleted value of this record to true.
    Enable Mark Timestamp During Synchronization.Add a long integer field named _fdl_update_timestamp to all target tables to record when the data is added to and updated in the database based on the database time in the form of a millisecond-level timestamp.
    Enable Synchronize Source Table Structure Change.Synchronize changes in the data definition language (DDL) that occurred in the source database, such as deleting tables, adding/deleting/renaming fields, and modifying field types/length/compatibility, to the target end automatically, without the need to modify the target table structure manually.

    2. Click Next.

    Table Field Mapping

    1. Modify the target table name and set the physical primary key for the three target tables sequentially, as shown in the following figure.

    (Optional) Rename the target table of the each_cost_recordorder_detial, and inventory tables CostOrder, and Inventory, respectively.

    Because you have enabled Mark Timestamp During Synchronization and Synchronize Source Table Structure Change, the two fields _fdl_update_timestamp and _fdl_marked_deleted will be added to the target table. 

    iconNote:
    You can also set the target table to an existing table and modify target table names in batches in this step. For details about the operation and the primary key configuration, see Pipeline Task Configuration - Table Field Mapping.

    2. Click Next.

    Pipeline Control

    The Pipeline Control configuration page is shown in the following figure.


    ProcedureMeaning
    Set Table Dirty Data Threshold to 1000 Row(s).

    Abort the running task when the number of dirty data records reaches 1000.

    iconNote:

    1. A maximum of 100,000 dirty data rows can be tolerated. The dirty data counting is reset after you restart the task.

    2. For details about dirty data processing, see Dirty Data Processing in Pipeline Task.

    Enable Retry After Failure and set it to Retry 3 Times, 2 Minute(s) Apart.Retry the pipeline task three times with an interval of two minutes each time upon task failure.
    Enable Result Notification.Notify the user of the structure change in the source table or task abortion caused by exceptions or dirty data by email.
    Enable Log Level Setting.Print a detailed log for users to view as the INFO-level log records the task running status and important events.

    Effect Display

    1. You can view the number of read and written rows. 

    If dirty data is found during table synchronization, you can process it following the document. For details, see Dirty Data Processing in Pipeline Task.

    2. You can view the three tables in the mysql database.

    The Inventory table (which is the target table of inventory) is shown in the following figure.

    3. If the inventory table in the fdl_test database (the source table) experiences the following changes:

    • The data whose ProductID value is 1 is deleted.

    • The Product name value of the data whose ProductID value is 2 is changed to Soy Milk.

    • Test field is added.

    The data in the Inventory table in the mysql database (the target table) is shown in the following figure.

    The corresponding user receives an email about source table structure changes, as shown in the following figure.

    1706002450643097_fixed.jpeg

    Pipeline Task O&M

    Choose O&M Center Pipeline Task > Task Management, where you can view the task running status and the data synchronization performance and check and handle exceptions. 

    For details, see Batch Pipeline Task O&M.

    附件列表


    主题: Data Pipeline
    • Helpful
    • Not helpful
    • Only read

    滑鼠選中內容,快速回饋問題

    滑鼠選中存在疑惑的內容,即可快速回饋問題,我們將會跟進處理。

    不再提示

    6s后關閉

    Get
    Help
    Online Support
    Professional technical support is provided to quickly help you solve problems.
    Online support is available from 9:00-12:00 and 13:30-17:30 on weekdays.
    Page Feedback
    You can provide suggestions and feedback for the current web page.
    Pre-Sales Consultation
    Business Consultation
    Business: international@fanruan.com
    Support: support@fanruan.com
    Page Feedback
    *Problem Type
    Cannot be empty
    Problem Description
    0/1000
    Cannot be empty

    Submitted successfully

    Network busy