历史版本2 :Pulsar输入 返回文檔
編輯時間: 內容長度:图片数:目录数: 修改原因:

目錄:

1. 概述编辑

1.1 版本

FineDataLink 版本功能变动
4.1.11.4-

1.2 应用场景

用户需要实时读取 Pulsar 中的数据,将数据解析后,写入指定数据库中,并使用数据进行报表和实时大屏制作和展示。

1.3 功能简介

FineDataLink 「实时任务」支持使用 Pulsar 输入算子进行实时读取数据。

2. 功能说明编辑

配置项说明
数据连接

配置Kafka数据源之后即可选择对应数据连接

Topic

手动输入读取 Pulsar 中的topic

例如:persistent://public/default/my-topic

同步类型
  • 存量+增量同步:初次启动任务时,同步 Pulsar 中所有数据,并持续同步 Pulsar 中的增量数据,再次启动任务时,从上次执行结束的断点继续同步;如果断点状态丢失,则按照初次启动逻辑同步数据。

  • 仅增量同步:从指定的起点开始,持续同步新增的变化数据(增/删/改)

再次启动任务时,从上次执行结束的断点继续同步,如果断点状态丢失,则按照初次启动逻辑同步数据。

输出字段配置读取 Pulsar 消息中哪些信息传输给下游算子
  • 时间戳(publishTime):消息的时间戳,消息的创建时间或到达 Pulsar 的时间

  • 消息键(key):消息的键

  • 主题topic:消息的topic名称

  • 消息内容(value):消息的实际内容

根据选择的数据内容,生成二维表

3. 操作步骤编辑

例如希望将 Pulsar 中的数据实时同步并处理后输出至指定的数据库中。

3.1 设置 Pulsar 输入

进入 FineDataLink 数据平台后,选择「数据开发>实时任务」,新建实时任务,,如下图所示:

在任务管理界面选择「编辑」,如下图所示:

进入编辑界面后,拖入 Pulsar 输入算子,选择数据源并设置同步类型,输入 Topic 并设置输出字段,如下图所示:

点击「数据预览」即可看到取出的 Pulsar 数据,如下图所示:

3.2 解析数据

使用「JSON解析」将需要解析的 value 值解析成二维表字段,如下图所示:

3.3 数据输出

设置数据实时计算后输出至指定的数据库中,如下图所示:

3.4 任务启动和管理

点击启动,即可启动实时任务,同时在任务管理界面看到任务运行状态和运行详情,如下图所示: