1. 概述
1.1 版本说明
FineDataLink 版本 | 功能变动 |
---|---|
4.0.5 | - |
4.1.3 | 管道任务写入端主动检测 Kafka 连接是否异常,若异常,则日志提示并终止任务 |
4.1.13.2 | 认证方式新增「Kerberos 认证」 |
4.2.4.3 | 传输队列配置位置修改为「管理系统>数据连接>实时采集任务」下,同时管理数据管道任务和实时任务 |
4.2.10.4 |
|
1.2 应用场景
在进行实时同步过程中,需要通过实时采集任务暂存来源数据库中的数据,便于目标数据库写入数据,实现实时数据同步。
因此在设置数据管道任务和实时任务前需要首先配置好暂存数据的中间件。
1.3 功能说明
FineDataLink 支持使用 Kafka 作为数据同步的中间件,实现:
读写两端分离,以保证在持续增量同步过程中,读写两端不会互相阻塞。
系统的短暂宕机后,已读取的数据可以不必再重复读取。
不能正常写入目标库的脏数据能够暂时存储。
帮助你更好地实现实时数据同步。
2. 使用限制&注意事项
2.1 使用限制
当前默认使用 Kafka 开源流处理平台。
只有 FDL 工程的超管才能配置传输队列。
2.2 注意事项
实时管道任务删除进入回收站后,若未彻底删除,则在传输队列 Kafka 中对应 Topic 不清理。
3. 前提条件
3.1 放置驱动
注:4.0.6 以及之后的版本已经内置了驱动,不需要此步骤。
驱动包:kafka.zip
使用前需要将驱动包中驱动解压,并放置在 FineDataLink 工程%FineDataLink%\webapps\webroot\WEB-INF\lib 目录下。
3.2 增加配置
如果部署的Kafka和 FDL 在同一台服务器上,则直接参考本文第三节内容配置传输队列即可。
但是如果部署的Kafka和 FDL 不在一台服务器上,Kafka 就需要进行一些单独配置实现 Kafka 的跨服务器访问。
1)若只需要内网访问 Kafka ,或者需要外网访问但是机器有外网网卡,此时只需要打开 Kafka 安装目录下/config/kraft目录中的 server.properties 文件,把 Kafka 服务器的 IP 和端口配置到 listeners,输入以下代码:
注:一般只推荐配置内网端口。
listeners=PLAINTEXT://ip:9092
如果需要外网访问但是机器没有外网网卡,则需要打开 Kafka 安装目录下/config/kraft目录中的 server.properties 文件,把 Kafka 服务器的 IP 和端口配置到 listeners 和 advertised.listeners,如下图所示:
listeners=PLAINTEXT://ip:9092
advertised.listeners=PLAINTEXT://ip:9092
注:Kafka 默认端口号为9092,可根据实际情况修改;上述代码的 ip 改为 kafka 的服务器 ip 。
2)重启 Kafka ,即先关闭 Kafka 再启动,详情参见:运维命令
4. 功能说明
1)进入 FineDataLink 工程,点击「管理系统>数据连接>实时采集任务」,选择「全局设置」按钮。如下图所示:
2)配置界面如下图所示:
4.1 传输队列配置
4.1.1 Kafka服务地址
4.2.10.4 之前版本:
1)支持 Kafka 单机或集群,填写 IP 地址或主机名以及端口号,多个地址以,隔开。
2)默认端口号为 9092。
4.2.10.4 及之后版本不同的是:
1)Kafka 集群需要点击「添加节点」按钮,配置主机和端口。
2)4.2.10.4 之前版本&传输队列使用 Kafka 集群&Kafka 某个节点异常时,如果异常的那个节点上刚好有 Topic,就会异常
4.2.10.4 及之后版本,支持配置 replication.factor 参数,为 Topic 默认副本数,分布在不同的节点上,当某个节点发生故障时,其他节点上的副本依然可用,从而保证了数据的可用性和容错性。
4.1.2 认证方式
认证方式支持:无认证、用户名密码、Kerberos。
「Kerberos 认证」相关说明请参见:配置Kafka数据源;若实时管道的来源端为 Kafka,若配置数据连接、传输队列都需要「Kerberos 认证」,则两边都配置「Kerberos 认证」即可。
4.1.3 健康检查
4.2.10.4 版本新增。
设置项 | 说明 |
---|---|
健康检查间隔 |
|
最大等待时间 |
|
4.1.4 参数配置
4.2.10.4 版本新增。
参数 | 说明 | 生效时机 |
---|---|---|
max.request.size | 1)单批次请求最大数据量,单位:字节;对应 Kafka 参数:max.request.size 2)必须与 Broker 的 message.max.bytes 匹配:
消费者单次拉取的数据量(max.partition.fetch.bytes)必须 ≥ max.request.size,否则无法消费大消息 |
|
max.partition.fetch.bytes | 1)从单个分区拉取的最大数据量,单位:字节 2)与 Broker 参数的关联 必须 ≥ Broker 的 message.max.bytes(Broker 允许的最大消息大小),否则消费者可能无法拉取大消息,导致 RecordTooLargeException |
|
fetch.max.wait.ms | 1)拉取请求的最大等待时间,单位:毫秒 2)若在等待时间内累积的数据达到 fetch.min.bytes,则立即返回;否则等待直到超时 与 fetch.min.bytes 配合控制吞吐和延迟 | |
fetch.min.bytes | 1)单次拉取请求的最小数据量,单位:毫秒 2)等待累积足够数据或达到 fetch.max.wait.ms 超时后响应 | |
replication.factor | 1)新建 Topic 默认副本数 2)需 ≤ Broker 总数 3)修改后仅对新创建的 Topic 生效 4)示例说明: 传输队列对接的 Kafka 集群为三节点,即有 3 个 Broker,期望能够提高传输队列的高可用性 将 replication.factor 修改为3,则后续新加的 Topic 默认都会存放 3 个副本 由于副本分布在不同节点上,可以确保单个节点宕机时,其他节点上的副本仍可用 | 立即生效 |
其他说明 | 支持点击+号,自定义添加参数: 1)自定义添加的参数,要求不能重名,但是生产者/消费者各自的参数,可能会有重复的,因此重名校验,只在生产者/消费者/Topic下各自进行 2)自定义添加的参数,需要符合 Kafka 的参数要求,否则不生效 3)生产者或消费者的的配置参数在应用程序启动时加载到内存中,因此在运行时修改参数值,不会自动生效。需要重启 FineDataLink 才能生效 |
4.2 磁盘管理
界面如下图所示:
4.2.1 磁盘清理配置
注:仅对 FDL 传输队列创建的 Topic 生效;如果是 Kafka 集群,清理配置对每一个 Kafka 服务节点都生效。
设置项 | 说明 | 生效时机 |
---|---|---|
暂存时间上限 |
| 立即生效 |
单个 Topic 存储上限(4.2.10.4 版本新增) |
|
4.2.2 磁盘告警配置
4.2.10.4 版本新增。
每 1h 检测一次,各个集群节点单独检测&通知,同一节点的通知 6h 内不重复通知。
配置说明请参见:任务控制-结果通知
通知时机 | 通知内容格式 |
---|---|
当 Kafka 所在存储目录,可用空间低于设定的数值时(最大可设置为 99999 ) | 标题: [传输队列] 传输队列 Kafka可用磁盘空间不足 内容: 传输队列 Kafka 服务[ip:port]可用磁盘空间已低于预警值,请及时调整可用磁盘空间,避免传输队列出现异常 |
4.2.3 磁盘使用情况
4.2.10.4 版本新增。
界面如下图所示:
Kafka 集群会展示不同kafka服务节点下的监控。
1)刷新频率:5 分钟一次。
2)展示指标:
数据占用存储量:查看 Kafka 数据目录总占用。
剩余可用存储量:查询 Kafka 数据目录剩余可用空间。
4.3 后续步骤
5. 注意事项
5.1 修改 Kafka 配置
若你修改了 Kafka 配置,可能会导致之前设置的数据管道任务中暂存在 Kafka 中的已读取数据丢失,请谨慎修改,如下图所示:
5.2 Kafka 传输队列连接异常
当连接异常的情况下注:包括 Kafka 传输队列自身连接异常和用户手动调整了 Kafka 传输队列连接,导致异常。
管道任务写入端主动检测 Kafka 是否异常,若异常,则日志提示并终止任务。
6. 兼容说明
4.2.10.4 之前的版本,升级到 4.2.10.4 及之后版本:
设置项 | 升级前 | 升级后 | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Kafka 连接地址栏 | 一个输入框里填写多个IP、端口 | 自动变成多主机地址的表格 | ||||||||||||||||||||||||
参数 | 之前在 FineDB 里配置的参数 | 升级为对应参数配置
|