反馈已提交

网络繁忙

数据管道任务-Kafka

  • 文档创建者:Roxy
  • 历史版本:7
  • 最近更新:Wendy123456 于 2023-08-09
  • 1. 概述

    1.1 版本

    版本
    功能变动
    4.0.20.1支持0.10.2到3.4的Kafka版本
    4.0.27交互体验优化

    1.2 应用场景

    用户需要将存在 Kafka 中的数据实时取出同步到数据库,并使用取出的数据。

    1.3 功能说明

    FineDataLink 数据管道支持用户通过配置 Kafka 消费者的方式,实时读取数据至指定数据库

    FineDataLink 会将从 Kafka 读取的字符串,尝试转换成 JSON 格式,如果能进行转换的数据,就会视为有效数据,并将其解析成二维表;不能转换的数据会忽略并且不计入脏数据,也不影响任务运行,在日志中会有类似 value is not json format 的报错,且这些数据不会统计在读取和输出的行数里。

    2. 前提条件

    在使 Kafka 进行实时数据同步前,需要首先建立 Kafka 数据连接,详情参见:配置Kafka数据源

    3. 操作步骤

    3.1 选择数据来源

    选择「数据管道>任务列表>新建管道任务」,如下图所示:

    选择需要数据同步的来源数据。

    选择 Kafka 数据源类型,并设置指定的数据连接,返回值格式选择JSON ,也就是将从 Kafka 读取的字符串,尝试转换成 JSON 格式,如果能进行转换的数据,就会对标准格式的 JSON 数据解析,转换为二维表进行实时同步。

    同时同步类型为全量+增量同步,并选择需要同步的对象,如下图所示:

    数据来源配置项说明如下:

    配置项
    配置说明
    数据源类型展示已配置过数据连接且当前用户有数据连接使用权限的选项
    数据连接

    下拉所有可选的对应上述数据源类型的数据连接。

    返回值格式

    默认为JSON

    默认对标准格式的 JSON 数据进行解析,将其解析为二维表

    例如: {"id":1, "name": "张三"}

    [ {"name": "张三"}, {"name": "bbb"}]

    注:不支持解析多层嵌套格式的JSON。

    同步类型

    默认全量+增量同步

    首次同步会从topic的各个分区earliest offset 开始订阅消费

    若存在消息消费记录,会恢复到之前offset开始订阅消费

    同步对象
    • 支持选择kafka中的所有topic

    注:单个任务限制最多选取5000个topic,达到限制时不允许新增选择。

    3.2 选择数据去向

    设置实时同步写入数据库,例如选择 MySQL 数据库,并设「目标端执行物理删除」,勾选「同步时标记时间戳」,如下图所示:

    注:关于时间戳和逻辑删除标识字段说明,详情参见:设置时间戳和逻辑删除标识字段

    3.3 设置表字段映射

    点击下一步进入字段映射设置界面。可以设置目标表名称;对来源表同步至目标表的字段映射进行设置。

    注:目标数据库数据表物理主键不为空,以此保证写入数据的唯一性

    目标数据表即已经读取并能转换为 JSON 格式的数据解析生成的字段。

    可以设置目标表表名,并手动指定有效的主键,如下图所示:


    注:对与目标表,由于_fdl_key 值可能为空,所以不提供默认主键,用户可自行选择主键。

    解析后的目标表字段说明如下:

    获取的字段字段说明FDL中的字段类型
    _fdl_key消息的keySTRING
    数据格式为Json的解析后的字段Message记录的headers反序列化得到的二维表数据根据用户表结构获取二维表字段
    _fdl_topic消息的Topic名称STRING
    _fdl_partition当前消息所在分区INTEGER
    _fdl_offset当前消息的偏移量LONG
    _fdl_timestamp当前消息的时间戳LONG

    比如来源端读取字符串数据,并解析成的 JSON 数据格式为 {"id": 14, "text": "HhMLQDAGGN", "date": "2010-04-27 06:56:49"}

    则解析后目标表字段数据为 id、text、date,如下图所示:

    注:目前仅支持获取前 5000 行字段和字段类型,如果某字段在 5000 行之后出现,则在字段映射建表时不会出现该字段。字段类型同理。

    3.4 设置管道控制

    点击下一步进行数据管道的任务设置。

    数据同步允许一定的容错,比如字段类型、长度不匹配、主键冲突等等问题,可以设置产生的脏数据上限,达到上限则自动终止管道任务。

    注:限制最多10w行,且重启任务后,会重置阈值统计。

    当源表结构变化时,可以将消息通知给指定用户,如下图所示:

    3.5 保存并运行任务

    点击「保存并启动」,即可保存并运行任务,如下图所示:

    3.6 效果查看

    此时即可在目标数据库 已经实现了数据表的实时同步,如下图所示:





    附件列表


    主题: 数据管道
    已经是第一篇
    已经是最后一篇
    • 有帮助
    • 没帮助
    • 只是浏览
    • 评价文档,奖励 1 ~ 100 随机 F 豆!

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持