1. 概述
1.1 应用场景
数仓拉链表 文档中提供的方案,使用「参数赋值」节点将来源表中的 ID 字段输出为参数,但「参数赋值」节点要求输出的参数不能超过 10000 个,若来源表数据超过 10000 条,该方案将不适用。
本文提供数据量较大场景(来源表数据超过 10000)下拉链表的实现方案。
注:4.1.10.2以及之后的版本支持循环容器循环次数无限制插件输出参数时将没有参数个数限制,可输出无限参数个数,若使用此插件,则数仓拉链表来源数据超过 10000也可使用数仓拉链表(来源表数据小于10000)。
1.2 实现思路
1)将来源表数据同步到中间表中,便于对来源表字段做操作。
2)标记中间表和拉链表共有字段,便于后续数据关联后区分字段来源。
3)将中间表与拉链表全外连接:
注:下图示例中,来源表主键字段为ID,拉链表中也包含ID字段。
可看出:
来源表存在 ID,拉链表无 ID,代表新增的数据(上图红色标记的数据)。
来源表无 ID,拉链表存在 ID ,代表删除的数据(上图紫色标记的数据)。
来源表ID=拉链表ID,其他字段不一致,代表更新的数据(上图黄色标记的数据)。
4)将新增、更新、删除数据标记。
5)不同类型的数据,根据不同策略输入到拉链表中(具体参考本文 2.2 节)。
任务Demo详情参见:官方demo:https://demo.finedatalink.com/ 中的定时任务:数仓拉链表(来源表数据大于10000)
2. 实现方法
2.1 场景模拟
AUTO_INCREMENT_ID 字段为拉链表的主键,为自增字段。
来源表说明:
更新苹果的数据。
新增橙子的数据。
删除火龙果的数据。
接下来要将来源表数据的变化,更新到拉链表中,同时满足保留历史快照的要求。
2.2 方案说明
数仓拉链表 文档中,来源表中存在更新时间戳字段,拉链表中新增数据的开始时间为来源表的时间戳字段。
但很多场景中,用户的来源库中没有更新时间戳字段,或者更新时间戳字段不准确。针对该场景,本文提供方案。
注:用户来源表中若存在更新时间戳字段,且该字段准确,可用该字段代替「新增数据的开始时间」。
设计逻辑(以本文场景的拉链表为例):
场景 | AUTO_INCREMENT_ID | ID | NAME | UNIT | START_TIME | END_TIME |
---|---|---|---|---|---|---|
新增 | 自增 | 新 | 新 | 新 | 1900-01-01 可替换为来源表的时间戳字段 | 9999-12-31 |
删除 | 不变 | 老 | 老 | 老 | 老 | 执行本文 3.2.3 节步骤的时间,值为now() |
更新-旧数据更新 | 不变 | 老 | 老 | 老 | 老 | 执行本文 3.2.3 节步骤的时间,值为now() |
更新-新数据插入 | 自增 | 新 | 新 | 新 | 执行本文 3.2.3 节步骤的时间,值为now() 可替换为来源表的时间戳字段 | 9999-12-31 |
所以 2.1 节场景中,拉链表应该:
更新数据:对已存在的苹果数据,END_TIME 更新为执行本文 3.2.3 节步骤的时间;将来源表中的苹果数据,新增到拉链表中,START_TIME 为执行本文 3.2.3 节步骤的时间,END_TIME 为9999-12-31。拉链表中将存在两条苹果数据,一条为失效数据,一条为最新数据。
新增数据:新增橙子的数据,START_TIME 为1900-01-01(由于来源表没有时间戳字段,此为自定义的时间),END_TIME 为9999-12-31
删除数据:更新火龙果数据,END_TIME 为执行本文 3.2.3 节步骤的时间。
3. 操作步骤
3.1 创建中间表
拉链表与来源表中,共有字段名称相同;本文方案需要将来源表数据与拉链表数据进行全外连接,识别新增、删除、更新数据,数据关联前需要修改来源表和目标表字段名,便于区分字段。
所以将来源表数据同步到中间表中,后续对中间表进行操作。
1)新建定时任务,拖入「数据同步」节点,取出来源表数据。如下图所示:
2)将数据写入到中间表中,设置 ID 为中间表的主键。如下图所示:
写入方式选择「清空目标表,再写入数据」,便于后期定期执行任务。
3)右键数据同步节点,选择「运行节点」,运行该节点,生成中间表,便于后续使用。
3.2 数仓拉链表实现
3.2.1 中间表与拉链表取并集
中间表与拉链表字段后加后缀,便于后续数据关联后区分字段来源。
1)拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,读取中间表数据。如下图所示:
3)拖入「字段设置」算子,在中间表的每个字段名称后加上_CUR,方便后续数据关联后识别字段来源。如下图所示:
4)拉链表中没被删除过的数据,END_TIME 为9999-12-31,将没被删除过的数据筛选出来,与中间表关联。如下图所示:
SELECT * FROM `demotest`.`ZIPPER`
WHERE DATE_FORMAT(END_TIME, '%Y-%m-%d')='9999-12-31'
5)拖入「字段设置」算子,在拉链表的每个字段名称后加上_PRE,方便后续数据关联后识别字段来源。
其中拉链表主键字段 AUTO_INCREMENT_ID 不用加此后缀,因为中间表中没有包含此字段,该字段无需做处理便于后续区),如下图所示:
6)拖入「数据关联」算子,将中间表与拉链表数据取并集(全外连接)。如下图所示:
点击「数据预览」,如下图所示:
可看出:
ID_CUR(来源表)存在,ID_PRE(拉链表)无,代表新增的数据(上图红色标记的数据)。
ID_CUR(来源表)无,ID_PRE(拉链表)存在,代表删除的数据(上图紫色标记的数据)。
ID_CUR(来源表)=ID_PRE(拉链表),其他字段不一致,代表更新的数据(上图黄色标记的数据)。
后续可新增一列,对数据进行标记。
3.2.2 标记数据
3.2.1 节中数据关联后,我们已总结新增、删除、更新数据的特征。本节,我们标记这些数据。
1)拖入「新增计算列」算子,对数据进行标记。如下图所示:
注:公式中字段为点击生成。
IF(ISNULL(ID_PRE),"新增",IF(ISNULL(ID_CUR),"删除",IF(OR(NAME_CUR<>NAME_PRE,UNIT_CUR<>UNIT_PRE),
"更新","相同")))
2)点击「数据预览」,如下图所示:
3.2.3 分发数据准备
拉链表中数据应该:
场景 | 拉链表数据变化 |
---|---|
更新苹果数据 | 对已存在的苹果数据,END_TIME 更新为执行该算子的时间 将来源表中的苹果数据,新增到拉链表中,START_TIME 为执行该算子的时间,END_TIME 为9999-12-31 |
新增数据 | 新增橙子的数据,START_TIME 为1900-01-01(由于来源表没有时间戳字段,此为自定义的时间),END_TIME 为9999-12-31 |
删除数据 | 更新火龙果数据,END_TIME 为执行该算子的时间 |
所以,需要新增数据列:
1)START_TIME:值为1900-01-01,为新增数据的开始时间
2)END_TIME:值为9999-12-31,为新增数据的结束时间;更新-新插入数据的结束时间
3)TIME:执行该算子的时间,值为now()
拖入「新增计算列」算子,新增计算列。如下图所示:
3.2.4 数据分发
场景 | 字段对应关系 | ||||||
---|---|---|---|---|---|---|---|
新增数据 | Type 字段值为新增的数据: 将新增数据插入,其中:
| ||||||
删除数据 | Type 字段值为删除的数据: 将拉链表中该条数据的 END_TIME 更新为 TIME 的值now() | ||||||
更新-老数据更新 | Type 字段值为更新的数据: 将拉链表中该条数据的 END_TIME 更新为 TIME 的值now() | ||||||
更新-新数据插入 | Type 字段值为更新的数据: 将来源表中的这条数据新增到拉链表中,其中:
|
1)拖入 4 个「DB表输出」算子,与前面的「新增计算列」算子相连。
2)右键点击「新增计算列1」算子,点击「数据分发」。设置如下图所示:
3)点击「DB表输出」算子,处理新增数据,将新增数据输入到拉链表中。如下图所示:
将标记为新增的数据写入到拉链表中,拉链表中数据的开始时间对应新增字段 START_TIME ,值为 1900-01-01 ;拉链表数据的结束时间对应新增字段 END_TIME ,值为 9999-12-31 。
写入方式选择「追加写入数据」。
4)点击「DB表输出1」算子,处理删除数据,更新删除数据的 END_TIME 。如下图所示:
将标记为删除的数据写入到拉链表中,拉链表数据的结束时间对应新增字段 TIME ,值为now()
写入方式选择「插入/更新/删除数据」中的更新,标识字段为Type,标识值为删除,逻辑主键为AUTO_INCREMENT_ID。如下图所示:
5)点击「DB表输出2」算子,更新拉链表中存在的旧更新数据中 END_TIME。如下图所示:
将标记为更新的数据写入到拉链表中,拉链表数据的结束时间对应新增字段 TIME ,值为now()
写入方式选择「插入/更新/删除数据」中的更新,标识字段为Type,标识值为更新,逻辑主键为AUTO_INCREMENT_ID。如下图所示:
6)点击「DB表输出3」算子,将来源表中的更新数据,新增到拉链表中,拉链表中的 START_TIME 对应 3.2.3 节中的 TIME,拉链表中的 END_TIME 对应 3.2.3 节中的 END_TIME。如下图所示:
写入方式选择「追加写入数据」。
7)点击右上角「保存」按钮。
3.3 效果查看
点击右上角「保存并运行」按钮,运行成功后,拉链表如下图所示: