
产品注册版本为「v5.0共创版」时,才能使用数据开发-实时任务:
1) 如需申请免费试用或增购功能点,请填写链接:实时任务意向使用申请表
2) 如需了解更多实时任务详细信息,可联系技术支持(技术支持联系方式:前往「服务」,选择「在线支持」、电话「400-811-8890」)。
1. 概述
1.1 版本
FineDataLink 版本 | 功能变动 |
---|---|
5.0.0.1 | - |
5.0.1.4 | Topic 中适配通配符:+ 和 # |
5.0.1.5 | 实时任务中若使用 MQTT 输入节点,不能再使用 分组汇总(实时任务)、Flink SQL、数据关联(多个实时数据源进行关联) |
1.2 应用场景
用户需要实时读取 MQTT 中的数据,将数据解析后,写入指定数据库中,并使用数据进行报表和实时大屏制作和展示。
1.3 功能简介
FineDataLink 的「数据开发-实时任务」支持使用「MQTT输入」算子实时读取数据。
1.4 功能限制
5.0.1.5 及之后版本,实时任务中若使用 MQTT 输入节点,不能再使用 分组汇总(实时任务)、Flink SQL、数据关联(多个实时数据源进行关联)。
2. 功能说明
「MQTT输入」算子设置界面如下图所示:
各设置项说明如下:
设置项 | 说明 | |||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
数据源 | 需要先 配置MQTT数据源 的数据连接,才能在此处选择;必填 | |||||||||||||||
Topic | 通过文本框输入 topic,比如:myhome/kitchen/smartdishwasher 5.0.1.4 及之后版本,Topic 中适配通配符:+ 和 #
| |||||||||||||||
输出字段 | 配置读取 MQTT 消息中哪些信息传输给下游算子,可选择「消息内容」、「主题」 1)主题(topic):消息的 topic 名称 2)消息内容(payload):消息的实际内容
| |||||||||||||||
同步类型 | 不可设置,默认为「仅增量同步」,即从指定的起点开始,持续同步新增的变化数据(增/删/改) | |||||||||||||||
增量同步起点 | 默认为「任务启动时间」,不可自定义 | |||||||||||||||
样本设置 | 因为 MQTT 输入只支持获取生成订阅后的消息,所以进行普通的采样,比如去取 mqtt broker中获取已有的数据,这个是无法实现的,并且考虑到 topic 中不一定有「保留消息」,所以「MQTT输入」算子的样本设置中提供两种方式:「手动配置」、「获取保留消息」,默认为「获取保留消息」 1)「获取保留消息」:数据预览时,新建一个不影响运行时逻辑的 client 订阅 topic ,如果 topic 中有保留消息时,则可以获取一条数据 2)「手动配置」:支持添加多条采样数据,如下图所示: | |||||||||||||||
数据预览 | 按照配置的输出字段和样本设置展示数据 如果获取保留消息时,topic 中没有保留消息,则只生成表结构,没有数据行 |
3. 操作步骤
3.1 读取数据
1)登录 FDL 工程,点击「数据开发>实时任务>新建」,新建实时任务。如下图所示:
自定义位置和名称后,点击「编辑」按钮进入实时任务设计页面。如下图所示:
2)拖入「MQTT输入」算子,选择数据源,设置 Topic 和输出字段。如下图所示:
点击「数据预览」,如下图所示:
取出的数据为 JSON 格式数据。
3.2 解析数据
1)拖入「JSON解析」算子,解析 3.1 节取出的数据。如下图所示:
2)点击「数据预览」,如下图所示:
3.3 数据输出
拖入「DB表输出」算子,将数据输出至指定的数据库中。如下图所示:
DB表输出设置详情参见:DB表输出(实时任务)
写入方式默认即可。
3.4 任务启动和管理
点击启动,即可启动实时任务,如下图所示:
同时在任务管理界面看到任务运行状态和运行详情,如下图所示: