历史版本13 :实时管道任务-Kafka 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 版本

FineDataLink 版本功能变动
4.0.20.1数据管道支持从 Kafka 数据源中取数
4.0.27交互体验优化
4.0.30选择来源步骤中,隐藏「同步类型」设置项

1.2 应用场景

用户需要将存在 Kafka 中的数据实时同步到数据库,并使用取出的数据。

1.3 功能简介

数据管道任务中,支持从 Kafka 数据源中取数,写入到指定数据库。

1.4 使用限制

1)支持 0.10.2 到 3.4 的 Kafka 版本。

2)同步的数据格式限制为 JSON 格式。

FineDataLink 会将从 Kafka 读取的字符串,尝试转换成 JSON 格式,如果能进行转换的数据,就会视为有效数据,并将其解析成二维表;不能转换的数据会忽略并且不计入脏数据,也不影响任务运行,在日志中会有类似 value is not json format 的报错,且这些数据不会统计在读取和输出的行数里。

2. 操作步骤编辑

2.1 前提条件

1)数据管道环境准备:数据管道使用说明

2)需要先建立 Kafka 数据连接。详情参见:配置Kafka数据源

2.2 选择来源

1)新建管道任务,数据源选择 Kafka,选择同步对象,点击「下一步」按钮。如下图所示:

30.png

各设置项说明如下表所示:

设置项
说明
数据源选择有权限的且已配置好的 Kafka 数据连接
消息格式

默认为 JSON

默认对标准格式的 JSON 数据进行解析,将其解析为二维表

例如: {"id":1, "name": "张三"}[ {"name": "张三"}, {"name": "bbb"}]

注:不支持解析多层嵌套格式的JSON

同步起点Kafka数据源目前不支持自定义同步起点,将从所选 Topic 的各个分区 Earliest Offset 开始同步
同步类型

4.0.30 及之前版本

默认全量+增量同步

首次同步会从 topic 的各个分区 earliest offset 开始订阅消费

若存在消息消费记录,会恢复到之前 offset 开始订阅消费

4.0.30 及之后版本:

该设置项被隐藏,默认为仅增量同步

同步对象

支持选择 Kafka 中的所有 topic

注:单个任务限制最多选取5000个topic,达到限制时不允许新增选择

2.3 选择数据去向

设置实时同步写入数据库,例如选择 MySQL 数据库,并设置「目标端执行逻辑删除」,开启「同步时标记时间戳」,点击下一步。如下图所示:

注:关于同步时标记时间戳目标端执行物理/逻辑删除说明请参见:配置管道任务-选择数据去向

31.png

2.4 设置表字段映射

设置目标表名称,设置目标表主键,点击「下一步」。如下图所示:

目标数据表即已经读取并能转换为 JSON 格式的数据解析生成的字段。

注1:目标表物理主键不能为空,以此保证写入数据的唯一性;本步骤详细说明请参见:配置管道任务-表字段映射

注2:对于目标表,由于_fdl_key 值可能为空,所以不提供默认主键,用户可自行选择主键。

33.png

解析后的目标表字段说明如下:

获取的字段字段说明FDL中的字段类型
_fdl_key消息的keySTRING
数据格式为Json的解析后的字段Message记录的headers反序列化得到的二维表数据根据用户表结构获取二维表字段
_fdl_topic消息的Topic名称STRING
_fdl_partition当前消息所在分区INTEGER
_fdl_offset当前消息的偏移量LONG
_fdl_timestamp当前消息的时间戳LONG

比如来源端读取字符串数据,并解析成的 JSON 数据格式为 {"id": 14, "text": "HhMLQDAGGN", "date": "2010-04-27 06:56:49"}

则解析后目标表字段数据为 id、text、date,如下图所示:

34.png

注:目前仅支持获取前 5000 行字段和字段类型,如果某字段在 5000 行之后出现,则在字段映射建表时不会出现该字段;字段类型同理。

2.5 设置管道控制

各设置项说明请参见:配置管道任务-管道控制

保存并启动任务后,可在目标数据库中查看同步后的数据。任务运维界面说明请参见:单个管道任务管理管道任务运维

35.png