1. 概述
1.1 应用场景
在实际业务场景中,做数据增量更新的时候,有时候会碰到以下情况:
当表数据很大时,每次更新只有很小一部分数据,如果只保留一张全量表,则无法查询历史某个时间段表的全量记录。如果每天都保留一张全量表,则会增大存储压力。
这时候解决方案就是制作拉链表,拉链表就是在原表的基础上,新增两列记录每条数据记录的生命周期开始和结束时间。
1.2 思路
注:若来源表数据小于 10000 ,可参考本文方案;若来源表数据大于 10000,可参考:数仓拉链表(来源表数据大于10000)
定时任务数仓拉链表的整体构建思路为:目标表内的所有数据只增不减,对于来源表新增、删除、更新数据的处理如下:
来源表有数据更新,则在目标表中:插入更新的数据,原记录失效时间变化为新记录插入时间。
来源表数据有新增记录,则在目标表中:直接插入新增的数据,生效时间为当前插入时间,失效时间不封顶。
来源表有数据删除,则在目标表中:对该记录失效时间进行变更,表明该记录失效。
任务 DEMO 详情参见:
官方demo:https://demo.finedatalink.com/ 「数仓拉链表(来源表数据小于10000)」
2. 实现方法
2.1 场景说明
数据库 demotest 中新建一张表 LLB_1,作为来源表,其主键为 ID,如下图所示:
数据库 demotest2 中新建一张表 LLB_2,作为目标表,KEY_VALUES 为主键,自增序列(即行号),如下图所示:
目标表 LLB_2 中保留着来源表 LLB_1 上一次更新的数据,而当前来源表已经发生了如下改变:
删除了 1001 线圈的记录
新增了 1004 木材的记录
更改了 1002 铜管的 UNIT 为 5000
接下来要将来源表数据的变化,更新到目标表中,同时满足保留历史快照的要求。
2.2 方案说明
在 FineDataLink 中任务设计如下图所示:
3. 操作步骤
3.1 对删除的数据进行标记
1)首先需要获取来源表 LLB_1 里 ID 字段的所有数据,便于后续将其作为参数,与目标表数据进行对比,找到来源表删除的数据。
创建一个定时任务,将一个「参数赋值」节点拖到设计界面,并重命名为「获取来源表主键字段ID」,输入 SQL 语句,如下图所示:
-- 获取来源表中ID的所有值
SELECT ID FROM `demotest`.`LLB_1`
2)将获取到的 ID 设置为参数,如下图设置,设置好后可使用参数预览先看看效果。
3)接着需要对目标表 LLB_2 中存在但来源表 LLB_1 中不存在,并且尚未标记过失效(ODS_END_DATE 时间为 2199-12-20 23:59:59)的被删除的数据进行标记。
将一个「SQL脚本」拖到设计界面,重命名为「标记目标表中失效字段」,并用线条跟上游「获取来源表主键字段ID」节点连接。
输入 SQL 语句,如下图所示:
-- 筛除出目标表中存在但是来源表不存在,且尚未标记过失效的字段,进行标记(update)
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID NOT IN (${LLB_1_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
此步骤目标表中 1001 线圈数据(也就是在来源表中已经删除的数据)的更新时间会被改为当前任务执行的时间,即标记其已经失效,如下图所示:
3.2 将新增数据写入目标表
1)获取目标表 LLB_2 里 ID 字段的所有数据,便于后续将其作为参数,与来源表LLB_1数据进行对比,找到来源表新增的数据。
将一个「参数赋值」节点拖到设计界面,并重命名为「获取目标表中所有ID值」,并用线条跟上游「标记目标表中失效字段」节点连接。输入 SQL 语句获取 LLB_2 表里 ID 字段的所有数据,如下图所示:
-- 获取目标表中ID的所有值
SELECT ID FROM `demotest2`.`LLB_2`
2)将获取到的 ID 输出为参数,如下图设置,设置好后可使用参数预览先看看效果。
注:此处要设置调试值,作用是下游节点可以预览数据和配置映射关系。
3)接着需要对来源表 LLB_1 中存在但目标表 LLB_2 中不存在,也就是新增的数据写入目标表 LLB_2。
将一个「数据同步」节点拖到设计界面,并重命名为「将新增的数据写入至目标表」,并用线条跟上游「获取目标表中所有ID值」节点连接。
写入 SQL 语句获取来源表新增的数据,如下图所示:
-- 获取来源表中存在但是目标表中不存在的字段,即新增的数据
SELECT *,'2199-12-20 23:59:59' AS
ODS_END_DATE FROM `demotest`.`LLB_1`
WHERE ID NOT IN (${LLB_2_ID})
4)再设置这个数据同步节点的数据去向与映射,将新增的数据写到目标表 LLB_2 中去,由于来源表和目标表的字段有一些不一致,因此在写入的时候,字段映射采用「同名映射」,同时需要手动调整对应目标字段。如下图所示:
字段映射关系详情参见:设置字段映射 。
5)写入方式选择「直接将数据写入目标表」。
此步骤目标表中会新增 1004 木材数据(也就是在来源表中新增的数据),如下图所示:
3.3 将更新的数据写入目标表
3.3.1 找到更新的数据
1)获取目标表 LLB_2 里 ODS_BEGIN_DATE 最晚时间,也就是最近一次数据同步的时间,便于后续将其作为参数,与来源表LLB_1 数据进行对比,找到来源表更新的数据。
将一个「参数赋值」节点拖到设计界面,并重命名为「获取目标表最大时间戳」,并用线条跟上游「标记目标表中失效的字段」节点连接,输入 SQL 语句,获取目标表 LLB_2 中最近一次数据同步的时间,如下图所示:
SELECT MAX(ODS_BEGIN_DATE) FROM `demotest2`.`LLB_2`
2)将取出来的最近一次同步时间输出为参数,如下图设置,可以使用参数预览先看看效果。
3)此时即可根据最近一次更新时间获取来源表 LLB_1 中存在更新的数据。
将一个「参数赋值」节点拖到设计界面,并重命名为「获取存在更新的字段ID」,并用线条跟上游「获取目标表中所有ID值」和「获取目标表最大时间戳」节点连接。输入 SQL 语句如下图所示:
-- 获取来源表中存在更新的数据
-- 获取思路:在来源表中寻找更新时间戳大于目标表中最大更新时间戳,且在目标表中已经存在的字段行
SELECT * FROM `demotest`.`LLB_1`
WHERE LLB_1.LAST_UPDATE > '${LAST_UPDATE}' AND LLB_1.ID IN (${LLB_2_ID})
点击数据预览即可看到来源表中有数据更新的数据 1002 铜管数据,如下图所示:
4)将更新过的 ID 输出为参数,如下图设置,可以使用参数预览先看看效果。
3.3.2 将更新数据对应的历史数据标记为失效
找到来源表更新的数据,然后即可将更新数据对应的历史数据标记为失效。
将一个「SQL脚本」拖到设计界面,重命名为「标为失效」,并用线条跟上游「获取存在更新的字段ID」节点连接,输入 SQL 语句,如下图所示:
-- 存在更新的数据插入目标表后,历史的数据就可以标记为失效了
UPDATE `demotest2`.`LLB_2`
SET LLB_2.ODS_END_DATE = NOW()WHERE LLB_2.ID IN (${UPDATE_ID})
AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
此步骤,目标表中会将来源表更新的数据对应的历史数据标记为失效,即结束时间由之前的 2199-12-20 23:59:59 改为任务更新的时间,如下图所示:
3.3.3 将新数据写入目标表
1)找到来源表更新的数据,可以将更新数据作为一条新的数据写入目标数据表。
将一个「数据同步」节点拖到设计界面,并重命名为「将更新的数据写入至目标表」,并用线条跟上游「获取存在更新的字段ID」节点连接,然后写入 SQL 语句,因为新写入的数据并没有失效,所以需要将 2199-12-20 23:59:59 写入目标数据表的 ODS_END_DATE 字段中。
-- 在来源表中,获取存在更新的字段行
SELECT *,'2199-12-20 23:59:59' AS ODS_END_DATE
FROM `demotest`.`LLB_1`WHERE LLB_1.ID IN (${UPDATE_ID})
2)再设置这个数据同步节点的数据去向,将更新的数据写到目标表 LLB_2 中去,由于来源表和目标表的字段有一些不一致,因此在写入的时候,字段映射采用「同名映射」,同时需要手动调整对应目标字段,如下图所示:
3)设置写入方式为「直接将数据写入目标表」。
此步骤目标表中会新增 1002 铜管数据(也就是在来源表中更新的数据),作为一条新增的数据写入目标表,同时结束时间显示为 2199-12-20 23:59:59,即未失效,如下图所示:
3.4 运行任务
执行该定时任务。
4. 效果查看
更新后的目标数据表中会有如下变化:
已经删除的 1001 线圈的记录,标记为失效(即 ODS_END_DATE 时间更新为当前时间)
新增一条 1004 木材的数据,ODS_END_DATE 时间设置为 2199-12-20 00:00:00 即未失效
新增一条 1002 铜管数据,ODS_END_DATE 时间设置为 2199-12-20 00:00:00 即未失效,UNIT 为 5000;同时,之前铜
管的数据标记为失效(即 ODS_END_DATE 时间更新为当前时间)