1. 概述
1.1 应用场景
在实际业务场景中,做数据增量更新的时候,有时候会碰到以下情况:
当表数据很大时,每次更新只有很小一部分数据,如果只保留一张全量表,则无法查询历史某个时间段表的全量记录。如果每天都保留一张全量表,则会增大存储压力。
这时候解决方案就是制作拉链表,拉链表就是在原表的基础上,新增两列,记录每条数据记录的生命周期开始和结束时间。
1.2 实现思路
ETL 任务数仓拉链表的整体构建思路为:目标表内的所有数据只增不减,对于来源表新增、删除、更新数据的处理如下:
来源表有记录更新,则在目标表中:插入更新的数据,原记录失效时间变化为新记录插入时间。
来源表数据有新增记录,则在目标表中:直接插入新增的数据,生效时间为当前插入时间,失效时间不封顶。
来源表有记录删除,则在目标表中:对该记录失效时间进行变更,可以表明该记录失效。
注:有些拉链表中,需要对数据增加版本号进行标记,目前 ETL 任务暂不支持。
2. 示例
2.1 准备数据
1)数据库 dwTest1 中新建一张表 LLB_1,作为来源表,其主键为 ID,如下图所示:
建表语句如下:
-- ----------------------------
-- 创建表,其中ID为主键
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_1` (
`ID` int(11) NOT NULL,
`NAME` varchar(255) DEFAULT NULL,
`UNIT` varchar(255) DEFAULT NULL,
`LAST_UPDATE` datetime DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of LLB_1
-- ----------------------------
INSERT INTO `LLB_1` VALUES ('1002', '铜管', '5000', '2020-08-16 19:48:41');
INSERT INTO `LLB_1` VALUES ('1003', '螺丝', '1000', '2020-08-07 15:53:08');
INSERT INTO `LLB_1` VALUES ('1004', '木材', '25000', '2020-08-16 19:47:41');
2)数据库 dwTest2 中新建一张表 LLB_2,作为目标表,KEY_VALUES 为主键,自增序列(即行号),如下图所示:
建表语句如下:
-- ----------------------------
-- 创建表,其中KEY_VALUES为自增序列
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_2` (
`ODS_BEGIN_DATE` datetime DEFAULT NULL,
`ODS_END_DATE` datetime DEFAULT NULL,
`ID` int(11) DEFAULT NULL,
`NAME` varchar(255) DEFAULT NULL,
`UNIT` varchar(255) DEFAULT NULL,
`KEY_VALUES` int(11) AUTO_INCREMENT,
KEY (`KEY_VALUES`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- 插入三行数据到LLB_2表中
-- ----------------------------
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1001', '线圈', '1000',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1002', '铜管', '500',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1003', '螺丝', '1000',NULL);
2.2 场景模拟
上面目标表 LLB_2 中保留着来源表 LLB_1 上一次更新的数据,而当前来源表已经发生了如下改变:
新增了 1004 木材的记录
删除了 1001 线圈的记录
更改了 1002 铜管的 UNIT 为 5000
接下来要将来源表数据的变化,更新到目标表中,同时满足保留历史快照的要求。
2.3 整体流程
实现上述场景的 ETL 整体流程如下:
2.4 实现步骤
首先按照下图把节点拖到设计界面,给这些节点重命名,并将节点连接起来。
2.4.1 对删除的数据进行标记
1)双击参数赋值节点「获取来源表主键字段ID」,如下设置,获取来源表中可作为比对字段的字段值,作为参数输出到下游。
用到的 SQL 语句如下:
-- 获取来源表中ID的所有值
SELECT ID FROM LLB_1
2)双击SQL脚本节点「标记目标表中失效字段」,基于上游传下来的参数,用 not in 语法,筛选出目标表中存在,但是来源表中不存在的字段。
对这些筛选出来的字段所在行,在目标表中做 UPDATE ,更新 END_DATE 值。,即标记这个字段失效。
SQL 语句为:
-- 筛除出目标表中存在但是来源表不存在,且尚未标记过失效的字段,进行标记(update)
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID NOT IN (${LLB_1_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
2.4.2 将新增数据写入目标表
1)双击参数赋值节点「获取目标表中所有ID值」,如下设置,获取目标表中可作为比对字段的字段值,作为参数输出到下游。
用到的 SQL 语句为:
-- 获取目标表中ID的所有值
SELECT ID FROM LLB_2
2)双击离线同步节点「将新增的数据写入至目标表」,如下设置,获取来源表中存在,目标表中不存在的数据,将其写入至目标表中。
用到的 SQL 语句如下:
-- 获取来源表中存在但是目标表中不存在的字段,即新增的数据
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM LLB_1 WHERE ID NOT IN (${LLB_2_ID})
2.4.3 将更新的数据写入目标表
1)双击参数赋值节点「获取目标表最大时间戳」,如下设置,获取目标表中最大的时间戳,输出为参数。
注:这一步要在步骤 2.4.2 完成前做,否则获取到的最大时间戳就不是上次同步的时间戳了。
用到的 SQL 语句如下:
SELECT MAX(ODS_BEGIN_DATE) FROM LLB_2
2)双击参数赋值节点「获取存在更新的字段ID」,如下设置,获取来源表中存在更新的数据。
用到的 SQL 语句如下:
-- 获取来源表中存在更新的数据
-- 获取思路:在来源表中寻找更新时间戳大于目标表中最大更新时间戳,且在目标表中已经存在的字段行
SELECT * FROM LLB_1 WHERE LLB_1.LAST_UPDATE > '${LAST_UPDATE}' AND LLB_1.ID IN (${LLB_2_ID})
3)双击SQL脚本节点「标为失效」,对于来源表中已经更新的数据,在目标表中的数据将其标记为失效。
SQL 语句如下:
-- 存在更新的数据插入目标表后,历史的数据就可以标记为失效了
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID IN (${UPDATE_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
4)双击离线同步节点「将更新的数据写入目标表」,如下设置,将来源表中存在更新的数据,抽取至目标表中。
用到的 SQL 语句如下:
-- 在来源表中,获取存在更新的字段行
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM LLB_1
WHERE LLB_1.ID IN (${UPDATE_ID})
字段映射关系注意下第四条:
2.5 效果查看
2.2 节里面的数据变化在来源表中发生后,执行该 ETL 任务,最终目标表的结果如下图所示:
打开数据库 dwTest2 中的表 LLB_2:
1001 线圈的记录,标记为失效(ODS_END_DATE)
新增一条 1002 铜管数据,UNIT 为 5000,之前铜管的数据标记为失效(ODS_END_DATE)
新增一条 1004 木材的数据