历史版本2 :数仓拉链表(来源表数据小于10000) 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 应用场景

在实际业务场景中,做数据增量更新的时候,有时候会碰到以下情况:

当表数据很大时,每次更新只有很小一部分数据,如果只保留一张全量表,则无法查询历史某个时间段表的全量记录。如果每天都保留一张全量表,则会增大存储压力。

这时候解决方案就是制作拉链表,拉链表就是在原表的基础上,新增两列,记录每条数据记录的生命周期开始和结束时间。

1.2 实现思路

ETL 任务数仓拉链表的整体构建思路为:目标表内的所有数据只增不减,对于来源表新增、删除、更新数据的处理如下:

  • 来源表有记录更新,则在目标表中:插入更新的数据,原记录失效时间变化为新记录插入时间。

  • 来源表数据有新增记录,则在目标表中:直接插入新增的数据,生效时间为当前插入时间,失效时间不封顶。

  • 来源表有记录删除,则在目标表中:对该记录失效时间进行变更,可以表明该记录失效。

注:有些拉链表中,需要对数据增加版本号进行标记,目前 ETL 任务暂不支持。

2. 示例编辑

2.1 准备数据

1)数据库 dwTest1 中新建一张表 LLB_1,作为来源表,其主键为 ID,如下图所示:

2.png

建表语句如下:

-- ----------------------------
-- 创建表,其中ID为主键
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_1` (
  `ID` int(11) NOT NULL,
  `NAME` varchar(255) DEFAULT NULL,
  `UNIT` varchar(255) DEFAULT NULL,
  `LAST_UPDATE` datetime DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
-- ----------------------------
-- Records of LLB_1
-- ----------------------------
INSERT INTO `LLB_1` VALUES ('1002', '铜管', '5000', '2020-08-16 19:48:41');
INSERT INTO `LLB_1` VALUES ('1003', '螺丝', '1000', '2020-08-07 15:53:08');
INSERT INTO `LLB_1` VALUES ('1004', '木材', '25000', '2020-08-16 19:47:41');

2)数据库 dwTest2 中新建一张表 LLB_2,作为目标表,KEY_VALUES 为主键,自增序列(即行号),如下图所示:

3.png

建表语句如下:

-- ----------------------------
-- 创建表,其中KEY_VALUES为自增序列
-- ----------------------------
CREATE TABLE IF NOT EXISTS `LLB_2` (
  `ODS_BEGIN_DATE` datetime DEFAULT NULL,
  `ODS_END_DATE` datetime DEFAULT NULL,
  `ID` int(11) DEFAULT NULL,
  `NAME` varchar(255) DEFAULT NULL,
  `UNIT` varchar(255) DEFAULT NULL,
  `KEY_VALUES` int(11) AUTO_INCREMENT,
   KEY (`KEY_VALUES`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
-- ----------------------------
-- 插入三行数据到LLB_2表中
-- ----------------------------
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1001', '线圈', '1000',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1002', '铜管', '500',NULL);
INSERT INTO `LLB_2` VALUES ('2020-08-07 15:53:08', '2199-12-20 23:59:59', '1003', '螺丝', '1000',NULL);

2.2 场景模拟

上面目标表 LLB_2 中保留着来源表 LLB_1 上一次更新的数据,而当前来源表已经发生了如下改变:

  • 新增了 1004 木材的记录

  • 删除了 1001 线圈的记录

  • 更改了 1002 铜管的 UNIT 为 5000

接下来要将来源表数据的变化,更新到目标表中,同时满足保留历史快照的要求。

2.3 整体流程

实现上述场景的 ETL 整体流程如下:

4.png

2.4 实现步骤

2.4.1 对删除的数据进行标记

1)创建一个 ETL 任务,将一个参数赋值节点拖到设计界面,并重命名为获取来源表主键字段ID」。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取 LLB_1 表里 ID 字段的所有数据。

5.png

SQL 语句如下:

-- 获取来源表中ID的所有值
SELECT ID FROM LLB_1

2)将获取到的 ID 输出为参数,如下图设置,设置好后可使用参数预览先看看效果。

6.png

3)将一个SQL脚本」拖到设计界面,重命名为标记目标表中失效字段」,并用线条跟上游获取来源表主键字段ID节点连接。

点击该节点,如下图设置,SQL 语句的作用是:对目标表中有但来源表中没有的数据,也就是被删除的数据进行标记。

7.png

SQL 语句如下:

-- 筛除出目标表中存在但是来源表不存在,且尚未标记过失效的字段,进行标记(update)
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID NOT IN (${LLB_1_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'

2.4.2 将新增数据写入目标表

1)将一个参数赋值节点拖到设计界面,并重命名为获取目标表中所有ID值」,并用线条跟上游标记目标表中失效字段节点连接。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取 LLB_2 表里 ID 字段的所有数据。

8.png

SQL 语句如下:

-- 获取目标表中ID的所有值
SELECT ID FROM  LLB_2

2)将获取到的 ID 输出为参数,如下图设置,设置好后可使用参数预览先看看效果。

注:此处要设置默认值,作用是下游节点可以预览数据和配置映射关系,但是运行任务时不会使用这些默认值。

9.png    3)将一个数据同步节点拖到设计界面,并重命名为将新增的数据写入至目标表」,并用线条跟上游「获取目标表中所有ID值」节点连接。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表新增的数据

10.png

SQL 语句如下:

-- 获取来源表中存在但是目标表中不存在的字段,即新增的数据
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM LLB_1  WHERE ID NOT IN (${LLB_2_ID})

4)再设置这个数据同步节点的数据去向,将新增的数据写到目标表 LLB_2 中去。

11.png

5)字段映射采用同名映射,但是需要注意有两个字段名称不一致,需要手动调整下。

12.png

2.4.3 将更新的数据写入目标表

1)将一个参数赋值节点拖到设计界面,并重命名为获取目标表最大时间戳」,并用线条跟上游「标记目标表中失效的字段」节点连接。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取目标表 LLB_2 中最近一次数据同步的时间。

13.png

SQL 语句如下:

SELECT MAX(ODS_BEGIN_DATE) FROM LLB_2

2)将取出来的最近一次同步时间输出为参数,如下图设置,可以使用参数预览先看看效果。

14.png

3)将一个参数赋值节点拖到设计界面,并重命名为获取存在更新的字段ID」,并用线条跟上游「获取目标表中所有ID值」和「获取目标表最大时间戳」节点连接。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表 LLB_1 中存在更新的数据。

15.png

SQL 语句如下:

-- 获取来源表中存在更新的数据
-- 获取思路:在来源表中寻找更新时间戳大于目标表中最大更新时间戳,且在目标表中已经存在的字段行
SELECT *  FROM  LLB_1 WHERE LLB_1.LAST_UPDATE  > '${LAST_UPDATE}' AND LLB_1.ID IN (${LLB_2_ID})

4)将更新过的 ID 输出为参数,如下图设置,可以使用参数预览先看看效果。

16.png

5)将一个SQL脚本」拖到设计界面,重命名为「标为失效」,并用线条跟上游「获取存在更新的字段ID」节点连接。

点击该节点,如下图设置,SQL 语句的作用是:将更新数据对应的历史数据标记为失效。

17.png

SQL 语句如下:

-- 存在更新的数据插入目标表后,历史的数据就可以标记为失效了
UPDATE LLB_2 SET LLB_2.ODS_END_DATE = NOW()
WHERE LLB_2.ID IN (${UPDATE_ID}) AND LLB_2.ODS_END_DATE = '2199-12-20 23:59:59'

6)将一个数据同步节点拖到设计界面,并重命名为将更新的数据写入至目标表」,并用线条跟上游「获取存在更新的字段ID」节点连接。

点击该节点,如下图设置数据来源,SQL 语句的作用是:获取来源表中存在更新的数据行。

18.png

SQL 语句如下:

-- 在来源表中,获取存在更新的字段行
SELECT *,'2199-12-20:23:59' AS ODS_END_DATE FROM  LLB_1
WHERE LLB_1.ID IN (${UPDATE_ID})

7)再设置这个数据同步节点的数据去向,将更新的数据写到目标表 LLB_2 中去。

19.png

8)字段映射采用同名映射,但是需要注意有两个字段名称不一致,需要手动调整下。

20.png

2.5 运行任务

2.2 节里面的数据变化在来源表中发生后,执行该 ETL 任务,最终目标表的结果如下图所示:

打开数据库 dwTest2 中的表 LLB_2:

  • 1001 线圈的记录,标记为失效(ODS_END_DATE)

  • 新增一条 1002 铜管数据,UNIT 为 5000,之前铜管的数据标记为失效(ODS_END_DATE)

  • 新增一条 1004 木材的数据

1647499262776883.png