提示:产品注册版本为「v5.0」时,才能使用数据开发-实时任务:
1) 如需申请免费试用或增购功能点,请填写链接:实时任务意向使用申请表
2) 如需了解更多实时任务详细信息,可联系技术支持(技术支持联系方式:前往「服务」,选择「在线支持」、电话「400-811-8890」)。
1. 概述
1.1 版本
| FineDataLink 版本 | 功能变动 |
|---|---|
| 5.0.0.1 | - |
| 5.0.0.4 | 支持配置消息格式类型 |
| 5.0.4.3 | 新增设置项「消息键」,支持通过单个或多个字段拼接的方式生成分区键,确保同一业务实体的数据可路由至同一分区,保障业务处理的顺序性 |
1.2 应用场景
用户需要实时读取数据,将数据处理后,将结构化数据序列化成 JSON 格式后放入至 Kafka 。
1.3 功能简介
FineDataLink 「实时任务」支持使用 Kafka 输出算子将数据实时同步输出至 Kafka 。
2. 功能说明
Kafka 输出界面如下图所示:

| 配置项 | 说明 |
|---|---|
| 数据连接 | 配置Kafka数据源 后即可在此处选择 |
| 目标 Topic | 选择写入到 kafka 中的 topic
|
| 消息键 | 应用场景: 在实时计算和数据处理场景中,需要通过多分区的方式提高系统吞吐量,但业务系统常常也需要确保消息的有序消费。例如,在订单、交易、账务等领域,某些事件必须严格按照发生顺序被下游消费,以保证计算结果的正确性 功能说明:
示例: 1)示例一: 需要将字段A、字段B、字段C相同的数据分到相同的 Kafka 分区内,以保证后续的顺序消费 设置消息键取值列:多选字段A、字段B、字段C 2)示例二 A为物理主键,A+B为联合主键,需要基于B,把有关联的A写到一个 Kafka 分区内,以保证字段B相同的数据的顺序 设置消息键取值列:字段B |
| 消息格式类型(5.0.0.4 及之后版本支持) | debezium-json: 按照元数据、事件类型和数据变化情况,构造包含老数据、新数据、操作类型等信息的消息结构,格式为: json: 把一行记录当前的字段值打包成 json 格式,示例为: 注:当前任务中,存在产生更新流的算子时,json 格式禁用;会产生更新流的算子:CDC输入、分组汇总、数据关联(多个实时数据源进行关联)。 |
| 写入方式 |
只支持将数据插入到kafka |
3. 操作步骤
用户将 MySQL 数据库中的结构化数据进行数据处理后实时同步至 kafka 中。
3.1 设置 CDC 输入
进入 FineDataLink 数据平台后,选择「数据开发>实时任务」,新建实时任务,,如下图所示:
注:需要注意的是如果使用 CDC输入,对应的来源端数据库需要开启日志,详情请参见:数据库环境准备概述

在任务管理界面选择「编辑」,如下图所示:

进入编辑界面后,拖入 CDC 输入算子,选择数据源并设置读取方式和同步类型,如下图所示:
注:选择数据源前需要先新建对应的数据连接,详情参见:创建并管理数据源

点击「数据预览」即可看到取出的数据库结构化数据,如下图所示:

3.2 数据处理
对读取的数据进行数据处理,例如新增计算列,新增一个字段拼接货主详细地址,如下图所示:

得到新增列如下图所示:

3.3 数据输出
设置数据实时计算后输出至 Kafka 中,使用 Kafka 输出算子,如下图所示:

3.4 任务启动和管理
点击启动,即可启动实时任务,如下图所示:

3.5 效果查看
在日志中即可看到首次同步运行情况,如下图所示:

输出的 Kafka 数据如下图所示:

若此时来源端数据库中新增了一条数据,那么点击「查看」,即可看到实时任务的运行情况,同步了一条数据到 Kafka 中,如下图所示:

