1. 概述编辑
1.1 应用场景
在实际业务场景中,做数据增量更新的时候,有时候会碰到以下情况:
当表数据很大时,每次更新只有很小一部分数据,如果只保留一张全量表,则无法查询历史某个时间段表的全量记录。如果每天都保留一张全量表,则会增大存储压力。
这时候解决方案就是制作拉链表,拉链表就是在原表的基础上,新增两列,记录每条数据记录的生命周期开始和结束时间。
1.2 实现思路
ETL 任务数仓拉链表的整体构建思路为:目标表内的所有数据只增不减,对于来源表新增、删除、更新数据的处理如下:
来源表有记录更新,则在目标表中:插入更新的数据,原记录失效时间变化为新记录插入时间。
来源表数据有新增记录,则在目标表中:直接插入新增的数据,生效时间为当前插入时间,失效时间不封顶。
来源表有记录删除,则在目标表中:对该记录失效时间进行变更,可以表明该记录失效。
注:有些拉链表中,需要对数据增加版本号进行标记,目前 ETL 任务暂不支持。
2. 示例编辑
2.1 准备数据
1)数据库 dwTest1 中新建一张表 LLB_1,作为来源表,其主键为 ID,如下图所示:
建表语句如下:
-- ----------------------------
-- 创建表,其中ID为主键
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_1` (
`ID` int(11) NOT NULL,
`NAME` varchar(255) DEFAULT NULL,
`UNIT` varchar(255) DEFAULT NULL,
`LAST_UPDATE` datetime DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of LLB_1
-- ----------------------------
INSERT INTO `LLB_1` VALUES ('1002', '铜管', '5000', '2020-08-16 19:48:41');
INSERT INTO `LLB_1` VALUES ('1003', '螺丝', '1000', '2020-08-07 15:53:08');
INSERT INTO `LLB_1` VALUES ('1004', '木材', '25000', '2020-08-16 19:47:41');
2)数据库 dwTest2 中新建一张表 LLB_2,作为目标表,KEY_VALUES 为主键,自增序列(即行号),如下图所示:
建表语句如下:
-- ----------------------------
-- 创建表,其中KEY_VALUES为自增序列
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_2` (
`ODS_BEGIN_DATE` datetime DEFAULT NULL,
`ODS_END_DATE` datetime DEFAULT NULL,
`ID` int(11) DEFAULT NULL,
`NAME` varchar(255) DEFAULT NULL,
`UNIT` varchar(255) DEFAULT NULL,
`KEY_VALUES` int(11) AUTO_INCREMENT,
KEY (`KEY_VALUES`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- 插入三行数据到LLB_2表中
-- ----------------------------
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1001', '线圈', '1000',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1002', '铜管', '500',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1003', '螺丝', '1000',NULL);
2.2 场景模拟
上面目标表 LLB_2 中保留着来源表 LLB_1 上一次更新的数据,而当前来源表已经发生了如下改变:
新增了 1004 木材的记录
删除了 1001 线圈的记录
更改了 1002 铜管的 UNIT 为 5000
接下来要将来源表数据的变化,更新到目标表中,同时满足保留历史快照的要求。
2.3 整体流程
实现上述场景的 ETL 整体流程如下:
2.4 实现步骤
2.4.1 对删除的数据进行标记
1)创建一个 ETL 任务,将一个「参数赋值」节点拖到设计界面,并重命名为「获取来源表主键字段ID」。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取 LLB_1 表里 ID 字段的所有数据。
SQL 语句如下:
-- 获取来源表中ID的所有值
SELECT ID FROM LLB_1
2)将获取到的 ID 输出为参数,如下图设置,设置好后可使用参数预览先看看效果。
3)将一个「SQL脚本」拖到设计界面,重命名为「标记目标表中失效字段」,并用线条跟上游「获取来源表主键字段ID」节点连接。
点击该节点,如下图设置,SQL 语句的作用是:对目标表中有但来源表中没有的数据,也就是被删除的数据进行标记。
SQL 语句如下:
-- 筛除出目标表中存在但是来源表不存在,且尚未标记过失效的字段,进行标记(update)
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID NOT IN (${LLB_1_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
2.4.2 将新增数据写入目标表
1)将一个「参数赋值」节点拖到设计界面,并重命名为「获取目标表中所有ID值」,并用线条跟上游「标记目标表中失效字段」节点连接。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取 LLB_2 表里 ID 字段的所有数据。
SQL 语句如下:
-- 获取目标表中ID的所有值
SELECT ID FROM LLB_2
2)将获取到的 ID 输出为参数,如下图设置,设置好后可使用参数预览先看看效果。
注:此处要设置默认值,作用是下游节点可以预览数据和配置映射关系,但是运行任务时不会使用这些默认值。
3)将一个「数据同步」节点拖到设计界面,并重命名为「将新增的数据写入至目标表」,并用线条跟上游「获取目标表中所有ID值」节点连接。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表新增的数据
SQL 语句如下:
-- 获取来源表中存在但是目标表中不存在的字段,即新增的数据
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM LLB_1 WHERE ID NOT IN (${LLB_2_ID})
4)再设置这个数据同步节点的数据去向,将新增的数据写到目标表 LLB_2 中去。
5)字段映射采用「同名映射」,但是需要注意有两个字段名称不一致,需要手动调整下。
2.4.3 将更新的数据写入目标表
1)将一个「参数赋值」节点拖到设计界面,并重命名为「获取目标表最大时间戳」,并用线条跟上游「标记目标表中失效的字段」节点连接。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取目标表 LLB_2 中最近一次数据同步的时间。
SQL 语句如下:
SELECT MAX(ODS_BEGIN_DATE) FROM LLB_2
2)将取出来的最近一次同步时间输出为参数,如下图设置,可以使用参数预览先看看效果。
3)将一个「参数赋值」节点拖到设计界面,并重命名为「获取存在更新的字段ID」,并用线条跟上游「获取目标表中所有ID值」和「获取目标表最大时间戳」节点连接。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表 LLB_1 中存在更新的数据。
SQL 语句如下:
-- 获取来源表中存在更新的数据
-- 获取思路:在来源表中寻找更新时间戳大于目标表中最大更新时间戳,且在目标表中已经存在的字段行
SELECT * FROM LLB_1 WHERE LLB_1.LAST_UPDATE > '${LAST_UPDATE}' AND LLB_1.ID IN (${LLB_2_ID})
4)将更新过的 ID 输出为参数,如下图设置,可以使用参数预览先看看效果。
5)将一个「SQL脚本」拖到设计界面,重命名为「标为失效」,并用线条跟上游「获取存在更新的字段ID」节点连接。
点击该节点,如下图设置,SQL 语句的作用是:将更新数据对应的历史数据标记为失效。
SQL 语句如下:
-- 存在更新的数据插入目标表后,历史的数据就可以标记为失效了
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID IN (${UPDATE_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'
6)将一个「数据同步」节点拖到设计界面,并重命名为「将更新的数据写入至目标表」,并用线条跟上游「获取存在更新的字段ID」节点连接。
点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表中存在更新的数据行。
SQL 语句如下:
-- 在来源表中,获取存在更新的字段行
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM LLB_1
WHERE LLB_1.ID IN (${UPDATE_ID})
7)再设置这个数据同步节点的数据去向,将更新的数据写到目标表 LLB_2 中去。
8)字段映射采用「同名映射」,但是需要注意有两个字段名称不一致,需要手动调整下。
2.5 运行任务
2.2 节里面的数据变化在来源表中发生后,执行该 ETL 任务,最终目标表的结果如下图所示:
打开数据库 dwTest2 中的表 LLB_2:
1001 线圈的记录,标记为失效(ODS_END_DATE)
新增一条 1002 铜管数据,UNIT 为 5000,之前铜管的数据标记为失效(ODS_END_DATE)
新增一条 1004 木材的数据