1. 概述
1.1 应用场景
在 API取数中有时需要日期序列作为 body 值输入(例如 body 中使用 date 值按照日期依次取出当月1号到任务运行当天中每天的数据);
此时就需要构建自定义开始时间为当月1号,结束时间为任务运行当天的日期序列,如下图所示:
用户当然可以使用 SQL 语句构建日期序列,但 SQL 语句受数据库语法限制各有不同,不够通用,因此本文提供使用 SparkSQL 算子构建日期序列的通用方式。
1.2 实现思路
使用 Spark SQL 和 内置参数${cyctime} 设置「开始时间」和「结束时间」,「开始时间」默认为当月1号,「结束时间」默认为任务运行当天。
使用 Spark SQL 函数构建开始时间和结束时间之间日期序列并将其作为参数输出。
使用循环容器循环遍历输出的日期序列参数。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「使用SparkSQL构建日期列并循环取数1」。
2. 操作步骤
以需要将某接口数据中的时间数据按日同步取出为例进行取数介绍。
请求参数明细说明:
参数名 | 参数说明 | 备注 |
---|---|---|
dateFrom | 开始时间 | 推荐按天取数 即开始时间与结束时间为同一天 |
dateTo | 结束时间 |
2.1 设置日期序列开始时间和结束时间
新建定时任务后,新增「数据转换」节点,进入编辑界面后新增「SparkSQL」算子,输入语句设置日期序列开始时间和结束时间,如下图所示:
语句如下:
SELECT CONCAT(LEFT('${cyctime}',7),"-01") as s_date,LEFT('${cyctime}',10) as e_date
点击数据预览即可看到输出的日期列开始时间和结束时间,如下图所示:
新增「参数输出」算子,将设置的时间作为参数输出,便于后续使用开始时间和结束时间构建连续的日期列,如下图所示:
2.2 构建时间序列
新建定时任务后,新增「数据转换」节点,进入编辑界面后新增「SparkSQL」算子,输入语句创建日期序列,如下图所示:
语句如下:
SELECT explode(sequence(to_date('${s_date}'), to_date('${e_date}'), interval 1 day)) as date
点击数据预览即可看到创建好的日期序列,如下图所示:
新增「参数输出」算子,将生成的日期序列数据作为参数输出,便于后续使用循环容器循环遍历输出的日期序列参数,如下图所示:
2.3 循环容器使用日期序列
新增循环容器节点,并将「数据转换」拖入循环容器中,如下图所示:
进入数据转换中,使用 API 输入算子,输入地址并在 body 中使用 2.2 节输出的日期序列参数${xunhuanrongqi},也就是按照日期依次将每天的数据取至指定数据库,如下图所示:
解析数据并进行处理后将数据使用 DB表输出至指定数据库即可,如下图所示:
保存后选定循环容器节点,勾选日期序列参数,也就是在循环容器中将日期一天天依次遍历,值传递给 API 输入算子进行取数,如下图所示:
2.4 设置消息通知
设置消息通知,使用日期序列参数,将日期循环的参数传递通知给指定用户,如下图所示:
2.5 效果查看
运行任务,可以看到根据日期序列取出 API 中每天的数据,同时通知每次循环的情况,如下图所示: