1. 概述
1.1 应用场景
当前上游数仓(由第三方搭建,非 FDL 搭建)无法保证每日在固定时间完成数据同步。若 BI 数据集采用定时抽取机制,可能获取到未更新的历史数据。
用户希望上游数仓更新后,再更新 BI 数据集,而不是只能主观设置一个时间点来开始。
1.2 实现思路
使用定时任务来监控数仓表是否完成更新,如果已经完成更新,调用 BI 数据集更新接口,触发 BI 公共数据集进行更新。
注:本方案的前提是数仓表有标识列能识别出数据已经完成更新。
2. 操作步骤
2.1 场景模拟
数仓来源表:

数仓目标表:

BI数据集:
![]()
从上图我们可以得出,数仓表已更新完成,但 BI 数据集尚未更新。
2.2 方案说明

通过对比数仓来源表的最大时间戳、数仓目标表的最近一次更新时间、BI 数据集的最近一次更新时间,确认是否数仓是否完成更新、BI 数据集是否进行过更新。
1)获取三个表的时间进行比较
获取数仓来源表最大时间戳,并设置为参数 max_order_timestamp。
获取数仓目标表的最近一次更新时间,并设置为参数 max_etl_time。
获取BI数据集的最近一次更新时间,并设置为参数max_BI_etl_time。
3 个时间做比较,当max_etl_time>=max_order_timestamp且max_etl_time>=max_BI_etl_time的时候,说明数仓完成更新,且 BI 未更新,此时需要触发 BI 更新。
2)调用 BI 数据集更新接口触发更新。
2.3 确定 BI 数据集是否已更新
2.3.1 获取三个时间值
1)新建定时任务,拖入「参数赋值」节点,获取数仓目标表的最近一次更新时间,输出为参数max_etl_time。如下图所示:

2)再拖入「参数赋值」节点,获取数仓来源表最大时间戳,输出为参数max_order_timestamp。如下图所示:

3)再拖入「参数赋值」节点,获取BI数据集的最近一次更新时间,输出为参数max_BI_etl_time。如下图所示:
__FR_BI_SERVER_SQL___更新信息日志:可获取数据集更新时间。
远程公共数据连接详情说明请参见:配置Fine+工程数据源
用户也可新建一张表,给出一个 BI 数据集的更新时间,可在后续 2.4.7 节步骤中,每次任务执行后更新该时间。

2.3.2 判断 BI 数据集是否更新
当max_etl_time>=max_order_timestamp且max_etl_time>=max_BI_etl_time的时候,说明数仓完成更新,且 BI 未更新,此时需要触发 BI 更新。
拖入「条件分支」节点,再拖入一个「数据转换」节点和一个「虚拟节点」,若 BI 数据集未更新,执行「数据转换」节点所在的分支,若 BI 数据集已更新或者数仓未同步,执行虚拟节点。

2.4 更新BI数据集

更新 BI 数据集需要调用的接口是 触发单表/文件夹更新 接口,该接口调用时需要填入:
请求参数 info :参数值格式为JSON,使用时需要进行编码 encodeURIComponent()。
fine_auth_token:需要传递 token 参数。
2.4.1 获取 token
进入「数据转换」节点,拖入「API输入」算子,调用 前台单点登录接口,获取 token。如下图所示:

由于获取到的数据非标准的 JSON 格式,不便于解析,所以需要处理下。
2.4.2 获取标准 JSON 格式
1)拖入「新增计算列」算子,新增一个字段,去除 2.4.1 节结果中的 callback 字符串及()。
MID(responseBody,FIND("(",responseBody)+1,FIND(")",responseBody)-FIND("(",responseBody)-1),responseBody为点击输入。

2)点击「数据预览」,可看到新增的字段是标准的 JSON 格式。

2.4.3 解析数据
拖入「JSON解析」算子,解析字段,获取accessToken、status字段值。如下图所示:

2.4.4 输出为参数
拖入「参数输出」算子,将accessToken、status字段值输出为参数。
accessToken:调用 BI 数据集更新接口时需要用到。
status:异常情况下,token 可能获取不成功,需要推送消息通知。

2.4.5 判断 token 是否获取成功
根据 status 参数的值,判断 token 是否获取成功,若获取成功,调用更新 BI 数据集接口,若获取失败,给专人进行通知。
1)拖入「条件分支」节点、「数据转换」节点、「消息通知」节点,若 status 参数值为成功,执行「数据转换」节点所在的分支,否则,执行「消息通知」节点。

2)若 token 获取失败,可通知给对应负责人。

2.4.6 对需要更新的数据集进行编码
更新 BI 数据集需要调用的接口是 触发单表/文件夹更新 接口,该接口调用时需要填写 info 参数。

「文件夹ID」获取方法:点击BI数据集所在的文件夹,在标签页中可看到文件夹ID。

1)进入「数据转换1」节点,拖入「DB表输入」算子,为生成 info 参数做准备。

点击「数据预览」,如下图所示:

2)拖入 Spark SQL 算子,对 info 参数进行编码。

3)拖入「参数输出」算子,将编码后的参数输出。

2.4.7 调用 BI 数据集更新接口
1)拖入「数据转换」节点,进入「数据转换」节点后,拖入API输入算子,调用 触发单表/文件夹更新 接口。

2)拖入「新增计算列」算子,输出当前时间和BI数据集名称:
注:该步骤和后续步骤可不做,API输入算子后可后接一个参数输出算子,任意输出一个参数即可,用户根据实际情况判断是否需要一张表记录BI数据集的更新时间和数据集名称。
触发更新时间:可作为 BI 数据集更新时间,该字段可在 2.3.1 节用到。
表名:记录BI数据集的名称。

3)拖入「DB表输出」算子,将当前时间和BI数据集名称输出到一张数据库表中。

2.5 效果查看
可为该定时任务设置执行频率,比如每五分钟执行一次。
当数仓目标表同步完成,且BI数据集未更新时,会更新BI数据集。

