1. 概述
1.1 版本說明
| FineDataLink 版本 | 功能變動 |
|---|---|
| 4.0.5 | - |
| 4.1.3 | 管道任務寫入端主動檢查 Kafka 連結是否異常,若異常,則日誌提示並終止任務 |
| 4.1.13.2 | 認證方式新增「Kerberos 認證」 |
| 4.2.4.3 | 傳輸佇列配置位置修改為「管理系統>資料連結>實時採集任務」下,同時管理資料管道任務和實時任務 |
1.2 應用場景
在進行實時同步程式中,需要透過資料管道暫存來源資料庫中的資料,便於目標資料庫寫入資料,實現實時資料同步。
因此在設定配置管道任務和實時任務前需要首先配置好暫存資料的中間軟體。
1.3 功能說明
FineDataLink 支援使用 Kafka 作為資料同步的中間軟體,實現:
讀寫兩端分離,以保證在持續增量同步程式中,讀寫兩端不會互相阻擋;
系統的短暫當機後,已讀取的資料可以不必再重複讀取;
不能正常寫入目標庫的髒資料能夠暫時儲存;
幫助你更好地實現實時資料同步。
2. 使用限制&注意事項
2.1 使用限制
當前預設使用 Kafka 開源流處理平台。
只有 FDL 工程的超管才能配置傳輸佇列。
2.2 注意事項
實時管道任務刪除進入回收站後,若未徹底刪除,則在傳輸佇列 Kafka 中對應 Topic 不清理。
3. 前提條件
3.1 放置驅動
注:4.0.6 以及之後的版本已經內建了驅動,不需要此步驟。
驅動包:kafka.zip
使用前需要將驅動包中驅動解壓,並放置在 FineDataLink 工程%FineDataLink%\webapps\webroot\WEB-INF\lib目錄下。
3.2 增加配置
如果部署Kafka和 FDL 在同一台伺服器上,則直接參考本文第三節內容配置傳輸佇列即可。
但是如果部署Kafka和 FDL 不在一台伺服器上,Kafka 就需要進行一些單獨配置實現 Kafka 的跨伺服器存取。
1)若只需要內網存取 Kafka ,或者需要外網存取但是機器有外網網卡,此時只需要開啟 Kafka 安裝目錄下/config/kraft目錄中的 server.properties 檔案,把 Kafka 伺服器的 IP 和埠配置到 listeners,輸入以下程式碼:
注:一般只推薦配置內網埠。
如果需要外網存取但是機器沒有外網網卡,則需要開啟 Kafka 安裝目錄下/config/kraft目錄中的 server.properties 檔案,把 Kafka 伺服器的 IP 和埠配置到 listeners 和 advertised.listeners,如下圖所示:
注:Kafka 預設埠號為9092,可根據實際情況修改;上述程式碼的 ip 改為 kafka 的伺服器 ip 。

2)重啟 Kafka ,即先關閉 Kafka 再啟動,詳情參見:維運命令
4. 功能說明
1)進入 FineDataLink 介面,點選「管理系統>資料連結>實時採集任務」,選擇「全局設定」按鈕。如下圖所示:

2)配置介面如下圖所示:

4.1 傳輸佇列配置
4.1.1 Kafka服務地址
4.2.10.4 之前版本:
1)支援 Kafka 單機或叢集,填寫 IP 地址或主機名以及埠號,多個地址以,隔開。
2)預設埠號為 9092。
4.2.10.4 及之後版本不同的是:
1)Kafka 叢集需要點選「新增節點」按鈕,配置主機和埠。
2)4.2.10.4 之前版本&傳輸佇列使用 Kafka 叢集&Kafka 某個節點異常時,如果異常的那個節點上剛好有 Topic,就會異常
4.2.10.4 及之後版本,支援配置 replication.factor 參數,為 Topic 預設複本數,分佈在不同的節點上,當某個節點發生故障時,其他節點上的複本依然可用,進而保證了資料的可用性和故障容許度性。
4.1.2 認證方式
認證方式支援:無認證、帳號密碼、Kerberos。
「Kerberos 認證」相關說明請參見:配置Kafka資料源;若實時管道的來源端為 Kafka,若配置資料連結、傳輸佇列都需要「Kerberos 認證」,則兩邊都配置「Kerberos 認證」即可。
4.1.3 健康檢查
4.2.10.4 版本新增。
| 設定項 | 說明 |
|---|---|
| 健康檢查間隔 |
|
| 最大等待時間 |
|
4.1.4 參數配置
4.2.10.4 版本新增。
| 參數 | 說明 | 生效時機 |
|---|---|---|
| max.request.size | 1)單批次請求最大數據量,單位:位元組;對應 Kafka 參數:max.request.size 2)必須與 Broker 的 message.max.bytes 匹配:
消費者單次拉取的資料量(max.partition.fetch.bytes)必須 ≥ max.request.size,否則無法消費大訊息 |
|
| max.partition.fetch.bytes | 1)從單個分割槽拉取的最大數據量,單位:位元組 2)與 Broker 參數的聯動 必須 ≥ Broker 的 message.max.bytes(Broker 允許的最大訊息大小),否則消費者可能無法拉取大訊息,導致 RecordTooLargeException |
|
| fetch.max.wait.ms | 1)拉取請求的最大等待時間,單位:毫秒 2)若在等待時間內累積的資料達到 fetch.min.bytes,則立即傳回;否則等待直到逾時 與 fetch.min.bytes 配合控制吞吐和延遲 | |
| fetch.min.bytes | 1)單次拉取請求的最小資料量,單位:毫秒 2)等待累積足夠資料或達到 fetch.max.wait.ms 逾時後回應 | |
| replication.factor | 1)建立 Topic 預設複本數 2)需 ≤ Broker 總數 3)修改後僅對建立立的 Topic 生效 4)範例說明: 傳輸佇列對接的 Kafka 叢集為三節點,即有 3 個 Broker,期望能夠提高傳輸佇列的高可用性 將 replication.factor 修改為3,則後續新加的 Topic 預設都會存放 3 個複本 由於複本分佈在不同節點上,可以確定單個節點當機時,其他節點上的複本仍可用 | 立即生效 |
| 其他說明 | 支援點選+號,自訂新增參數: 1)自訂新增的參數,要求不能重名,但是生產者/消費者各自的參數,可能會有重複的,因此重名校驗,只在生產者/消費者/Topic下各自進行 2)自訂新增的參數,需要符合 Kafka 的參數要求,否則不生效 3)生產者或消費者的的配置參數在應用程式啟動時載入到記憶體中,因此在運作時修改參數值,不會自動生效。需要重啟 FineDataLink 才能生效 | |
4.2 磁碟管理
介面如下圖所示:

4.2.1 磁碟清理配置
注:僅對 FDL 傳輸佇列建立的 Topic 生效;如果是 Kafka 叢集,清理配置對每一個 Kafka 服務節點都生效。
| 設定項 | 說明 | 生效時機 |
|---|---|---|
| 暫存時間上限 |
| 立即生效 |
| 單個 Topic 儲存上限(4.2.10.4 版本新增) |
|
4.2.2 磁碟警報配置
4.2.10.4 版本新增。
每 1h 檢查一次,各個叢集節點單獨檢查&通知,同一節點的通知 6h 內不重複通知。
配置說明請參見:任務控制-結果通知
| 通知時機 | 通知內容格式 |
|---|---|
| 當 Kafka 所在儲存目錄,可用空間低於設定的數值時(最大可設定為 99999 ) | 標題: [傳輸佇列] 傳輸佇列 Kafka可用磁碟空間不足 內容: 傳輸佇列 Kafka 服務[ip:port]可用磁碟空間已低於預警值,請即時調整可用磁碟空間,避免傳輸佇列出現異常 |
4.2.3 磁碟使用情況
4.2.10.4 版本新增。
介面如下圖所示:
Kafka 叢集會展示不同kafka服務節點下的監視。
1)重新整理頻率:5 分鐘一次。
2)展示指標:
資料佔用儲存量:查看 Kafka 資料目錄總佔用。
剩餘可用儲存量:查詢 Kafka 資料目錄剩餘可用空間。
4.3 後續步驟
5. 注意事項
5.1 修改 Kafka 配置
若你修改了 Kafka 配置,可能會導致之前設定的資料管道任務中暫存在 Kafka 中的已讀取資料丟失,請謹慎修改,如下圖所示:

5.2 Kafka 傳輸佇列連結異常
當連結異常的情況下注:包括 Kafka 傳輸佇列自身連結異常和使用者手動調整了 Kafka 傳輸佇列連結,導致異常。
管道任務寫入端主動檢查 Kafka 是否異常,若異常,則日誌提示並終止任務。

6. 相容說明
4.2.10.4 之前的版本,升級到 4.2.10.4 及之後版本:
| 設定項 | 升級前 | 升級後 | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Kafka 連結地址欄 | 一個輸入框裏填寫多個IP、埠 | 自動變成多主機地址的表格 | ||||||||||||||||||||||||
| 參數 | 之前在 FineDB 裏配置的參數 | 升級為對應參數配置
|
