1. 概述
1.1 版本
FineDataLink 版本 | 功能變動 |
---|---|
4.0.20.1 | 資料管道支援從 Kafka 資料源中取數 |
4.0.27 | 交互體驗優化 |
4.0.30 | 選擇來源步驟中,隱藏「同步類型」設定項 |
1.2 應用場景
使用者需要將存在 Kafka 中的資料實時同步到資料庫,並使用取出的資料。
1.3 功能簡介
資料管道任務中,支援從 Kafka 資料源中取數,寫入到指定資料庫。
1.4 使用限制
1)支援 0.10.2 到 3.4 的 Kafka 版本。
2)同步的資料格式限制為 JSON 格式。
FineDataLink 會將從 Kafka 讀取的字串,嘗試轉換成 JSON 格式,如果能進行轉換的資料,就會視為有效資料,並將其解析成二維表;不能轉換的資料會忽略並且不計入髒資料,也不影響任務運作,在日誌中會有類似 value is not json format 的報錯,且這些資料不會統計在讀取和匯出的行數裏。
2. 操作步驟
2.1 前提條件
1)資料管道環境準備:数据管道使用说明
2)需要先建立 Kafka 資料連結。詳情參見:配置Kafka資料源
2.2 選擇來源
1)建立管道任務,資料源選擇 Kafka,選擇同步物件,點選「下一步」按鈕。如下圖所示:
各設定項說明如下表所示:
設定項 | 說明 |
---|---|
資料源 | 選擇有權限的且已配置好的 Kafka 資料連結 |
訊息格式 | 預設為 JSON 預設對標準格式的 JSON 資料進行解析,將其解析為二維表 例如:{"id":1, "name": "張三"}、[ {"name": "張三"}, {"name": "bbb"}] 注:不支援解析多層嵌套格式的JSON |
同步起點 | Kafka資料源目前不支援自訂同步起點,將從所選 Topic 的各個分割槽 Earliest Offset 開始同步 |
同步類型 | 4.0.30 及之前版本: 預設全量+增量同步 首次同步會從 topic 的各個分割槽 earliest offset 開始訂閱消費 若存在訊息消費記錄,會恢複到之前 offset 開始訂閱消費 4.0.30 及之後版本: 該設定項被隱藏,預設為僅增量同步 |
同步物件 | 支援選擇 Kafka 中的所有 topic 注:單個任務限制最多選取5000個topic,達到限制時不允許新增選擇 |
2.3 選擇資料去向
設定實時同步寫入資料庫,例如選擇 MySQL 資料庫,並設定「目標端執行邏輯刪除」,開啟「同步時標記時間戳」,點選「下一步」。如下圖所示:
注:關於「同步時標記時間戳」、「目標端執行物理/邏輯刪除」說明請參見:配置管道任务-选择数据去向
2.4 設定表欄位映射
設定目標表名稱,設定目標表主鍵,點選「下一步」。如下圖所示:
目標資料表即已經讀取並能轉換為 JSON 格式的資料解析生成的欄位。
注1:目標表物理主鍵不能為空,以此保證寫入資料的唯一性;本步驟詳細說明請參見:配置管道任务-表字段映射
注2:對於目標表,由於_fdl_key 值可能為空,所以不提供預設主鍵,使用者可自行選擇主鍵。
解析後的目標表欄位說明如下:
獲取的欄位 | 欄位說明 | FDL中的欄位類型 |
---|---|---|
_fdl_key | 訊息的key | STRING |
資料格式為Json的解析後的欄位 | Message記錄的headers反序列化得到的二維表資料 | 根據使用者表結構獲取二維表欄位 |
_fdl_topic | 訊息的Topic名稱 | STRING |
_fdl_partition | 當前訊息所在分割槽 | INTEGER |
_fdl_offset | 當前訊息的偏移 | LONG |
_fdl_timestamp | 當前訊息的時間戳 | LONG |
比如來源端讀取字串資料,並解析成的 JSON 資料格式為{"id": 14, "text": "HhMLQDAGGN", "date": "2010-04-27 06:56:49"}
則解析後目標表欄位資料為 id、text、date,如下圖所示:
注:目前僅支援獲取前 5000 行欄位和欄位類型,如果某欄位在 5000 行之後出現,則在欄位映射建表時不會出現該欄位;欄位類型同理。
2.5 設定管道控制
各設定項說明請參見:配置管道任务-管道控制
儲存並啟動任務後,可在目標資料庫中查看同步後的資料。任務維運介面說明請參見:单个管道任务管理、管道任务运维