You want to synchronize the data stored in Kafka to the database in real time and use the fetched data.
FineDataLink's Data Pipeline supports real-time data synchronization from Kafka to the specified database by configuring the Kafka consumer.
FineDataLink tries to convert strings read from Kafka into JSON objects. Successfully converted data is considered valid and parsed into a two-dimensional table. Data that cannot be converted is skipped and not treated as dirty data. The failed data does not affect task running, nor is it included in the read and output rows, but triggers error messages similar to "value is not json format" in the log.
Connect FineDataLink to Kafka before synchronizing real-time data from Kafka. For details, see Kafka Data Connection.
Choose Data Pipeline > New > Pipeline Task, as shown in the following figure.
Select the data to be synchronized.
Select Kafka as the data source and select the specified data connection. Select JSON as Return Value Format to convert the string read from Kafka into a JSON object. Successfully converted data is parsed into a two-dimensional table.
Select the objects to be synchronized, as shown in the following figure.
The configuration items in the source selection steps are described as follows.
It defaults to JSON.
By default, standard JSON data is parsed into a two-dimensional table, for example, {"id": 1,"name":"Alice"} is parsed into [ {"name": "Alice"}, {"name": "bbb"}].
You can select all topics in Kafka.
You can select up to 5,000 topics in a single task.
Set the target database to write the real-time data, for example, a MySQL database, and select Physical Deletion at Target End and Mark Timestamp During Synchronization, as shown in the following figure.
Click Next to enter the field mapping page. You can set the target table name and map source fields to target fields.
Fields in the target data table are generated after the data that has been read from Kafka and converted into JSON data is parsed.
You can set the target table name and the effective primary key, as shown in the following figure.
The value of _fdl_key in the target table may be null, so no default primary key is set. You can set the primary key by yourself.
Fields of the target table are described as follows.
For example, if JSON data parsed from the string data read from Kafka is {"id": 14,"text":"HhMLQDAGGN,""date":"2010-04-27 06:56:49"}, the data will be parsed into id, text and date fields into the target table, as shown in the following figure.
Note:Only the first 5000 rows of fields and types are obtained currently. The exceeding content is not displayed on the Table Field Mapping page.
Click Next to enter the pipeline task setting page.
The data synchronization task can proceed despite issues such as mismatched field types, lengths, and primary key conflicts. You can set the upper limit of the dirty data volume to abort the task when the limit is reached.
Note:A maximum of 100,000 dirty data rows can be tolerated. The dirty data counting is reset after a restart.
You can notify the specified user of the source table structure changes, as shown in the following figure.
Click Save and Start to save and run the task, as shown in the following figure.
The real-time data in Kafka has been synchronized to the data table in the target database, as shown in the following figure.
滑鼠選中內容,快速回饋問題
滑鼠選中存在疑惑的內容,即可快速回饋問題,我們將會跟進處理。
不再提示
10s後關閉
Submitted successfully
Network busy