1. 概述
1.1 版本
FineDataLink 版本 | 功能变动 |
---|---|
4.0.20.1 | 数据管道支持从 Kafka 数据源中取数 |
4.0.27 | 交互体验优化 |
4.0.30 | 选择来源步骤中,隐藏「同步类型」设置项 |
1.2 应用场景
用户需要将存在 Kafka 中的数据实时同步到数据库,并使用取出的数据。
1.3 功能简介
数据管道任务中,支持从 Kafka 数据源中取数,写入到指定数据库。
1.4 使用限制
1)支持 0.10.2 到 3.4 的 Kafka 版本。
2)同步的数据格式限制为 JSON 格式。
FineDataLink 会将从 Kafka 读取的字符串,尝试转换成 JSON 格式,如果能进行转换的数据,就会视为有效数据,并将其解析成二维表;不能转换的数据会忽略并且不计入脏数据,也不影响任务运行,在日志中会有类似 value is not json format 的报错,且这些数据不会统计在读取和输出的行数里。
2. 操作步骤
2.1 前提条件
1)数据管道环境准备:数据管道使用说明
2)需要先建立 Kafka 数据连接。详情参见:配置Kafka数据源
2.2 选择来源
1)新建管道任务,数据源选择 Kafka,选择同步对象,点击「下一步」按钮。如下图所示:
各设置项说明如下表所示:
设置项 | 说明 |
---|---|
数据源 | 选择有权限的且已配置好的 Kafka 数据连接 |
消息格式 | 默认为 JSON 默认对标准格式的 JSON 数据进行解析,将其解析为二维表 例如: {"id":1, "name": "张三"}、[ {"name": "张三"}, {"name": "bbb"}] 注:不支持解析多层嵌套格式的JSON |
同步起点 | Kafka数据源目前不支持自定义同步起点,将从所选 Topic 的各个分区 Earliest Offset 开始同步 |
同步类型 | 4.0.30 及之前版本: 默认全量+增量同步 首次同步会从 topic 的各个分区 earliest offset 开始订阅消费 若存在消息消费记录,会恢复到之前 offset 开始订阅消费 4.0.30 及之后版本: 该设置项被隐藏,默认为仅增量同步 |
同步对象 | 支持选择 Kafka 中的所有 topic 注:单个任务限制最多选取5000个topic,达到限制时不允许新增选择 |
2.3 选择数据去向
设置实时同步写入数据库,例如选择 MySQL 数据库,并设置「目标端执行逻辑删除」,开启「同步时标记时间戳」,点击「下一步」。如下图所示:
注:关于「同步时标记时间戳」、「目标端执行物理/逻辑删除」说明请参见:配置管道任务-选择数据去向
2.4 设置表字段映射
设置目标表名称,设置目标表主键,点击「下一步」。如下图所示:
目标数据表即已经读取并能转换为 JSON 格式的数据解析生成的字段。
注1:目标表物理主键不能为空,以此保证写入数据的唯一性;本步骤详细说明请参见:配置管道任务-表字段映射
注2:对于目标表,由于_fdl_key 值可能为空,所以不提供默认主键,用户可自行选择主键。
解析后的目标表字段说明如下:
获取的字段 | 字段说明 | FDL中的字段类型 |
---|---|---|
_fdl_key | 消息的key | STRING |
数据格式为Json的解析后的字段 | Message记录的headers反序列化得到的二维表数据 | 根据用户表结构获取二维表字段 |
_fdl_topic | 消息的Topic名称 | STRING |
_fdl_partition | 当前消息所在分区 | INTEGER |
_fdl_offset | 当前消息的偏移量 | LONG |
_fdl_timestamp | 当前消息的时间戳 | LONG |
比如来源端读取字符串数据,并解析成的 JSON 数据格式为 {"id": 14, "text": "HhMLQDAGGN", "date": "2010-04-27 06:56:49"}
则解析后目标表字段数据为 id、text、date,如下图所示:
注:目前仅支持获取前 5000 行字段和字段类型,如果某字段在 5000 行之后出现,则在字段映射建表时不会出现该字段;字段类型同理。
2.5 设置管道控制
各设置项说明请参见:配置管道任务-管道控制
保存并启动任务后,可在目标数据库中查看同步后的数据。任务运维界面说明请参见:单个管道任务管理、管道任务运维