Pipeline Task - Kafka

  • Last update: September 25, 2024
  • Overview 

    Version 

    FineDataLink VersionFunctional Change
    4.0.20.1Supported Apache Kafka of versions from 0.10.2 to 3.4.
    4.0.27Optimized the interactive experience.

    Application Scenario 

    You want to synchronize the data stored in Kafka to the database in real time and use the fetched data.

    Function Description 

    FineDataLink's Data Pipeline supports real-time data synchronization from Kafka to the specified database by configuring the Kafka consumer.

    FineDataLink tries to convert strings read from Kafka into JSON objects. Successfully converted data is considered valid and parsed into a two-dimensional table. Data that cannot be converted is skipped and not treated as dirty data. The failed data does not affect task running, nor is it included in the read and output rows, but triggers error messages similar to "value is not json format" in the log.

    Prerequisite 

    Connect FineDataLink to Kafka before synchronizing real-time data from Kafka. For details, see Kafka Data Connection.

    Procedure 

    Source Selection 

    Choose Data Pipeline > New > Pipeline Task, as shown in the following figure.

    Select the data to be synchronized.

    Select Kafka as the data source and select the specified data connection. Select JSON as Return Value Format to convert the string read from Kafka into a JSON object. Successfully converted data is parsed into a two-dimensional table.

    Select the objects to be synchronized, as shown in the following figure.

     

    The configuration items in the source selection steps are described as follows.

    Configuration ItemDescription
    Data SourceThe drop-down list contains the data sources connected via data connections on which you have Use permission.
    Data ConnectionThe drop-down list contains all data connections to the selected data source.
    Return Value Format

    It defaults to JSON.

    By default, standard JSON data is parsed into a two-dimensional table, for example, {"id": 1,"name":"Alice"} is parsed into [ {"name": "Alice"}, {"name": "bbb"}].

    iconNote:
    It is not supported to parse multi-level JSON data.


    Synchronization Object

    You can select all topics in Kafka.

    You can select up to 5,000 topics in a single task.

    Target Selection 

    Set the target database to write the real-time data, for example, a MySQL database, and select Physical Deletion at Target End and Mark Timestamp During Synchronization, as shown in the following figure.

    iconNote:
    For details about the timestamp and the logical deletion identifier, see Pipeline Task Configuration - Target Selection.



    Table Field Mapping 

    Click Next to enter the field mapping page. You can set the target table name and map source fields to target fields.

    iconNote:
    The physical primary key of the table in the target database cannot be null to ensure the uniqueness of the written data.

    Fields in the target data table are generated after the data that has been read from Kafka and converted into JSON data is parsed.

    You can set the target table name and the effective primary key, as shown in the following figure.

    iconNote:

    The value of _fdl_key in the target table may be null, so no default primary key is set. You can set the primary key by yourself.

    Fields of the target table are described as follows.

    Fetched FieldField DescriptionField Type in FineDataLink
    _fdl_keyThe key of the messageSTRING
    The field whose data is in JSON format after parsingThe two-dimensional table data derived from deserializing the headers recorded in the messageObtain fields in the two-dimensional table according to the table structure.
    _fdl_topicThe topic name of the messageSTRING
    _fdl_partitionThe partition in which the current message is locatedINTEGER
    _fdl_offsetThe offset of the current messageLONG
    _fdl_timestampThe timestamp of the current messageLONG

    For example, if JSON data parsed from the string data read from Kafka is {"id": 14,"text":"HhMLQDAGGN,""date":"2010-04-27 06:56:49"}, the data will be parsed into idtext and date fields into the target table, as shown in the following figure.

    iconNote:
    Only the first 5000 rows of fields and types are obtained currently. The exceeding content is not displayed on the Table Field Mapping page.

    Pipeline Control Setting 

    Click Next to enter the pipeline task setting page.

    The data synchronization task can proceed despite issues such as mismatched field types, lengths, and primary key conflicts. You can set the upper limit of the dirty data volume to abort the task when the limit is reached.

    iconNote:
    A maximum of 100,000 dirty data rows can be tolerated. The dirty data counting is reset after a restart.

    You can notify the specified user of the source table structure changes, as shown in the following figure.

    Task Saving and Running 

    Click Save and Start to save and run the task, as shown in the following figure.

    Effect Display 

    The real-time data in Kafka has been synchronized to the data table in the target database, as shown in the following figure.

     


    附件列表


    主题: Data Pipeline
    Previous
    Next
    • Helpful
    • Not helpful
    • Only read

    滑鼠選中內容,快速回饋問題

    滑鼠選中存在疑惑的內容,即可快速回饋問題,我們將會跟進處理。

    不再提示

    10s後關閉

    Get
    Help
    Online Support
    Professional technical support is provided to quickly help you solve problems.
    Online support is available from 9:00-12:00 and 13:30-17:30 on weekdays.
    Page Feedback
    You can provide suggestions and feedback for the current web page.
    Pre-Sales Consultation
    Business Consultation
    Business: international@fanruan.com
    Support: support@fanruan.com
    Page Feedback
    *Problem Type
    Cannot be empty
    Problem Description
    0/1000
    Cannot be empty

    Submitted successfully

    Network busy