1. 概述
1.1 版本
| FineDataLink 版本 | 功能变动 |
|---|---|
| 5.0.4.4 | 实时任务中新增「RabbitMQ输入」算子,读取该数据源 |
1.2 功能简介
数据开发-实时任务中,新增「RabbitMQ输入」算子,支持读取 RabbitMQ 数据。如下图所示:

1.3 使用限制
1)不支持和 使用 Flink 引擎的算子 一起使用。
2)数据一致性只能保证至少一次,且前提为:用户的 RabbitMQ 开启持久化机制(消息持久化、交换机持久化、队列持久化)。
至少一次概念:保证每条数据至少会被处理一次,但可能会有重复。
2.操作步骤
2.1 读取 RabbitMQ 数据
1)登录 FDL 工程,点击「数据开发>实时任务>新建」,新建实时任务,给实时任务自定义名称。如下图所示:

2)进入实时任务编辑界面后,拖入「RabbitMQ输入」算子,设置数据连接、队列、输出字段。如下图所示:

各设置项说明如下表所示:
| 设置项 | 说明 |
|---|---|
| 数据连接 | 1)需要配置 RabbitMQ 数据连接,详情请参见:配置RabbitMQ数据源 2)可点击「数据源权限检测」按钮,检测数据连接是否正常 |
| 队列 | 配置RabbitMQ数据源 文档中,若未放置 JAR 包,则无法自动读取 RabbitMQ 中的队列,此时可手动输入 |
| 输出字段 | 配置读取 RabbitMQ 消息中哪些信息传输给下游算子 1)队列(queue):消息的队列名称 2)消息内容(payload):消息的实际内容 对于不同数据格式的 value(json、键值对、Avro等),统一都转换为文本格式 |
| 同步类型 | 默认为仅增量同步,不可修改 从指定的起点开始,「增量同步」阶段启动,持续同步数据;若队列中有未消费消息将一并被读取 |
| 增量同步起点 | 默认为任务启动时间,不可修改 |
| 样本设置 | 只提供「手动配置」 会根据配置的输出字段,在样本设置弹窗内生成一个N列、1行的二维表,默认生成一条样本数据,用户可以修改其中每一个单元格的内容,各输出字段的默认值为:
|
3)点击「数据预览」,可查看读取的数据。如下图所示:

2.2 后续步骤
1)由于读取的数据是 JSON 数据,用户可拖入「JSON解析」算子进行解析,解析后,根据实际情况继续拖入其他算子处理数据或者将数据输出。
2)启动实时任务即可。
