管道任務寫入端主動檢查 Kafka 連結是否異常,若異常,則日誌提示並終止任務
在進行實時同步程式中,需要透過資料管道暫存來源資料庫中的資料,便於目標資料庫寫入資料,實現實時資料同步。
因此在設定配置管道任務和實時任務前需要首先配置好暫存資料的中間軟體。
FineDataLink 支援使用 Kafka 作為資料同步的中間軟體,實現:
讀寫兩端分離,以保證在持續增量同步程式中,讀寫兩端不會互相阻擋;
系統的短暫當機後,已讀取的資料可以不必再重複讀取;
不能正常寫入目標庫的髒資料能夠暫時儲存;
幫助你更好地實現實時資料同步。
當前預設使用 Kafka 開源流處理平台。
只有 FDL 工程的超管才能配置傳輸佇列。
實時管道任務刪除進入回收站後,若未徹底刪除,則在傳輸佇列 Kafka 中對應 Topic 不清理。
注:4.0.6 以及之後的版本已經內建了驅動,不需要此步驟。
驅動包:kafka.zip
使用前需要將驅動包中驅動解壓,並放置在 FineDataLink 工程%FineDataLink%\webapps\webroot\WEB-INF\lib目錄下。
如果部署Kafka和 FDL 在同一台伺服器上,則直接參考本文第三節內容配置傳輸佇列即可。
但是如果部署Kafka和 FDL 不在一台伺服器上,Kafka 就需要進行一些單獨配置實現 Kafka 的跨伺服器存取。
1)若只需要內網存取 Kafka ,或者需要外網存取但是機器有外網網卡,此時只需要開啟 Kafka 安裝目錄下/config/kraft目錄中的 server.properties 檔案,把 Kafka 伺服器的 IP 和埠配置到 listeners,輸入以下程式碼:
注:一般只推薦配置內網埠。
listeners=PLAINTEXT://ip:9092
如果需要外網存取但是機器沒有外網網卡,則需要開啟 Kafka 安裝目錄下/config/kraft目錄中的 server.properties 檔案,把 Kafka 伺服器的 IP 和埠配置到 listeners 和 advertised.listeners,如下圖所示:
listeners=PLAINTEXT://ip:9092advertised.listeners=PLAINTEXT://ip:9092
注:Kafka 預設埠號為9092,可根據實際情況修改;上述程式碼的 ip 改為 kafka 的伺服器 ip 。
2)重啟 Kafka ,即先關閉 Kafka 再啟動,詳情參見:維運命令
1)進入 FineDataLink 介面,點選「管理系統>資料連結>實時採集任務」,選擇「全局設定」按鈕。如下圖所示:
2)配置介面如下圖所示:
4.2.10.4 之前版本:
1)支援 Kafka 單機或叢集,填寫 IP 地址或主機名以及埠號,多個地址以,隔開。
2)預設埠號為 9092。
4.2.10.4 及之後版本不同的是:
1)Kafka 叢集需要點選「新增節點」按鈕,配置主機和埠。
2)4.2.10.4 之前版本&傳輸佇列使用 Kafka 叢集&Kafka 某個節點異常時,如果異常的那個節點上剛好有 Topic,就會異常
4.2.10.4 及之後版本,支援配置 replication.factor 參數,為 Topic 預設複本數,分佈在不同的節點上,當某個節點發生故障時,其他節點上的複本依然可用,進而保證了資料的可用性和故障容許度性。
認證方式支援:無認證、帳號密碼、Kerberos。
「Kerberos 認證」相關說明請參見:配置Kafka資料源;若實時管道的來源端為 Kafka,若配置資料連結、傳輸佇列都需要「Kerberos 認證」,則兩邊都配置「Kerberos 認證」即可。
4.2.10.4 版本新增。
每隔一段時間,主動檢查 Kafka 連結的可用性
單位為秒
如果無運作中的實時採集任務,則停止健康檢查
單次檢查時,若超過一段時間不回應,則認為此次檢查不透過
1)單批次請求最大數據量,單位:位元組;對應 Kafka 參數:max.request.size
2)必須與 Broker 的 message.max.bytes 匹配:
Broker 預設允許的最大訊息大小為 1MB(由 message.max.bytes 控制);若 max.request.size> message.max.bytes,Broker 會拒絕訊息
與消費者參數的聯動
消費者單次拉取的資料量(max.partition.fetch.bytes)必須 ≥ max.request.size,否則無法消費大訊息
Kafka 作為傳輸佇列時:立即生效
Kafka 作為資料連結時:用到 Kafka 作為來源/去向的實時管道/實時任務,任務再次啟動時生效
1)從單個分割槽拉取的最大數據量,單位:位元組
2)與 Broker 參數的聯動
必須 ≥ Broker 的 message.max.bytes(Broker 允許的最大訊息大小),否則消費者可能無法拉取大訊息,導致 RecordTooLargeException
Kafka 作為傳輸佇列時:實時管道/實時任務,再次啟動後生效
Kafka 作為資料連結時:用到Kafka作為來源/去向的實時管道/實時任務,任務再次啟動後生效
1)拉取請求的最大等待時間,單位:毫秒
2)若在等待時間內累積的資料達到 fetch.min.bytes,則立即傳回;否則等待直到逾時
與 fetch.min.bytes 配合控制吞吐和延遲
1)單次拉取請求的最小資料量,單位:毫秒
2)等待累積足夠資料或達到 fetch.max.wait.ms 逾時後回應
1)建立 Topic 預設複本數
2)需 ≤ Broker 總數
3)修改後僅對建立立的 Topic 生效
4)範例說明:
傳輸佇列對接的 Kafka 叢集為三節點,即有 3 個 Broker,期望能夠提高傳輸佇列的高可用性
將 replication.factor 修改為3,則後續新加的 Topic 預設都會存放 3 個複本
由於複本分佈在不同節點上,可以確定單個節點當機時,其他節點上的複本仍可用
支援點選+號,自訂新增參數:
1)自訂新增的參數,要求不能重名,但是生產者/消費者各自的參數,可能會有重複的,因此重名校驗,只在生產者/消費者/Topic下各自進行
2)自訂新增的參數,需要符合 Kafka 的參數要求,否則不生效
3)生產者或消費者的的配置參數在應用程式啟動時載入到記憶體中,因此在運作時修改參數值,不會自動生效。需要重啟 FineDataLink 才能生效
介面如下圖所示:
注:僅對 FDL 傳輸佇列建立的 Topic 生效;如果是 Kafka 叢集,清理配置對每一個 Kafka 服務節點都生效。
單位為天,預設值為7
修改後,對歷史和後續建立的 Topic 均生效
暫存時間不可超過 90,超出後將採用“先進先出”的原則清理
單位為 G,最大值為 99999
配置每個 Topic 的儲存用量上限,到達上限後會自動清理一批時間靠前的資料
每 1h 檢查一次,各個叢集節點單獨檢查&通知,同一節點的通知 6h 內不重複通知。
配置說明請參見:任務控制-結果通知
標題:
[傳輸佇列] 傳輸佇列 Kafka可用磁碟空間不足
內容:
傳輸佇列 Kafka 服務[ip:port]可用磁碟空間已低於預警值,請即時調整可用磁碟空間,避免傳輸佇列出現異常
Kafka 叢集會展示不同kafka服務節點下的監視。
1)重新整理頻率:5 分鐘一次。
2)展示指標:
資料佔用儲存量:查看 Kafka 資料目錄總佔用。
剩餘可用儲存量:查詢 Kafka 資料目錄剩餘可用空間。
進行快取配置後,即可建立資料管道任務和實時任務,詳情參見:配置管道任務、實時任務
若你修改了 Kafka 配置,可能會導致之前設定的資料管道任務中暫存在 Kafka 中的已讀取資料丟失,請謹慎修改,如下圖所示:
當連結異常的情況下注:包括 Kafka 傳輸佇列自身連結異常和使用者手動調整了 Kafka 傳輸佇列連結,導致異常。
管道任務寫入端主動檢查 Kafka 是否異常,若異常,則日誌提示並終止任務。
4.2.10.4 之前的版本,升級到 4.2.10.4 及之後版本:
升級為對應參數配置
生產者:security.protocol
消費者:security.protocol
生產者:ssl.truststore.location
消費者:ssl.truststore.location
生產者:ssl.truststore.password
消費者:ssl.truststore.password
消費者:ssl.keystore.location
生產者:ssl.keystore.password
消費者:ssl.keystore.password
生產者:ssl.key.password
消費者:ssl.key.password
滑鼠選中內容,快速回饋問題
滑鼠選中存在疑惑的內容,即可快速回饋問題,我們將會跟進處理。
不再提示
10s後關閉
反馈已提交
网络繁忙