1. 概述
1.1 应用场景
用户使用「JSON生成」算子生成多条 JSON 语句写入 API 时,每条 JSON 语句会触发一次 API 请求;当数据量较大时(如 1000 条),会产生等量的 API 调用,极易触发接口的频率限制,导致任务失败。

1.2 实现思路
1)假设每次写入 100 条数据,计算来源数据要写入的次数,作为分页参数。
2)循环容器节点循环方式为条件循环,执行条件为${loopTimes}<=${分页}
3)循环容器内:
「DB表输入」算子每次读取 100 条数据,直至读取所有数据。
使用「JSON生成」算子和「Spark SQL」算子,将待写入的多条数据拼接,批量写入接口。
注:本文方案,适用于支持批量写入的接口。

1.3 任务展示
Demo 示例详情请参见:https://demo.finedatalink.com/ 大数据量循环批量写入API接口。
2. 操作步骤
2.1 准备工作
准备一个支持批量写入的接口。本文示例的接口是使用 数据接收 功能发布的接口:

示例 API 的JSON 结构如下:
[{
"ID": 19.0,
"姓名": "杜浩",
"工资": 1200.0,
"出生日期": 25196.0,
"入职时间": 33785.0,
"籍贯": "山东",
"性别": "男",
"部门": "人力资源部",
"name": null
}, {
"ID": 24.0,
"姓名": "彭勃",
"工资": 3600.0,
"出生日期": 25154.0,
"入职时间": 32927.0,
"籍贯": "青海",
"性别": "男",
"部门": "人力资源部",
"name": null
}]
2.2 计算要写入的次数并输出为参数
1)新建定时任务,拖入「参数赋值」节点。
2)假设每次写入 100 条数据(根据实际情况设置,每次写入的数据条数需要小于等于接口支持写入的最大数据量),计算出要写入的次数,并输出为参数。
注:下面 SQL 语句以 MySQL 数据库语法为例,其他数据库需要根据语法自行修改。
select ceil(count(`ID`)/100) as a from `demo1`.`人员信息表`

2.3 设置循环容器
拖入「循环容器」节点,循环执行模式为「条件循环」,执行条件设置为${loopTimes}<=${分页}。如下图所示:

2.4 循环容器内设置
2.4.1 输出分页参数
本文示例中,我们取数时会使用 OFFSET 参数跳过指定数量的行,再配合 LIMIT 实现分页。

所以需要先计算出每次循环使用的 OFFSET 值。
「循环容器」内拖入「参数赋值」节点,使用内置参数 loopTimes 计算 OFFSET 值,并输出为参数。如下图所示:
注:OFFSET 起点设置为0,表示从第一条数据开始取数。
SELECT 100*(${loopTimes}-1) as offset

2.4.2 接口每次写入多条数据
1)「参数赋值」节点后拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,每次读取 100 条数据,直至到达循环结束条件。
注:需要使用 ORDER BY 语句保证返回结果的顺序,ORDER BY 后字段建议是主键或唯一索引。
SELECT * FROM `demo1`.`人员信息表`
ORDER BY `ID`
LIMIT 100 OFFSET ${offset}

点击「数据预览」,可看到取出的数据。如下图所示:

3)拖入「JSON生成」算子,生成 API 接口需要的数据。如下图所示:

点击「数据预览」,可看到生成了 100 条 JSON 格式的数据。如下图所示:

若此时调用接口写入数据,将调用 100 次接口。
4)拖入「Spark SQL」算子,拼接所有的 JSON 语句。
注:SQL 语句中的「JSON生成」为上游算子的名称,需要点击输入。
SELECT
CONCAT(
'[',
CONCAT_WS(',', collect_list(JSON生成.`JSON_DATA`)),
']'
) AS json_array
FROM JSON生成
点击「数据预览」,如下图所示:

5)拖入「API输出」算子,将数据写入到 API 接口中。如下图所示:

2.5 效果查看
运行成功后,可看到「循环容器」会根据分页次数执行对应的循环次数,并且每次动态修改 OFFSET 起点,最终实现全量写入接口。

