1. 概述编辑
1.1 应用场景
数仓调度场景 中,将各个业务系统的原始数据使用 FDL 抽取至数据仓库的 ODS层,由于读取的是业务系统的历史全量数据,数据量较大。如果同步过程中任务报错,数据将同步一半导致数据重复。
希望能避免这个问题发生。
注:适用于新增、更新数据,且目标表中无主键。
1.2 实现思路
源数据表和目标表都有一个标记字段,初始值为 0 ;由于目标表中无主键,数据写入方式选择追加写入,新数据不会覆盖目标表中的旧数据。
目标表中,将近 30 天的数据标记为 1 ,即旧数据标记为 1 ;如果标记到一半失败,将近 30 天且标记为 1 的数据标记为 0 。
使用「数据同步」节点,将近 30 天的数据同步到目标表中(标记字段也同步到目标表中)。
若同步成功,此时目标表中存在近 30 天标记为 1 的旧数据,近 30 天标记为 0 的新数据,大于 30 天标记为 0 的老数据,此时将近 30 天标记为 1 的旧数据删除即可。
若同步到一半失败,将近 30 天标记为 0 的新数据删除,近 30 天标记为 1 的数据标记恢复为 0 。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「场景案例>数据仓库场景>避免数据同步一半」。
2. 示例编辑
2.1 场景模拟
需要将「订单信息」表中「订购日期」为近 30 天的数据同步到「S目标表」中,同时删除「S目标表」中「订购日期」为近 30 天的数据。
「S目标表」和「订单信息」都有字段「biaoji」,默认为 0 。
注:「S目标表」没有主键。
2.2 标记旧数据
新建 ETL 任务后,添加一个「SQL脚本」节点,命名为「标记旧数据」。
在「S目标表」中,「订购日期」为近 30 天的数据标记为 1 。如下图所示:
update S目标表 set biaoji=1 where biaoji=0 and 订购日期 >= DATE_SUB(curdate(),INTERVAL 30 day)
2.3 旧数据标记失败
若「标记旧数据」节点运行失败,恢复旧数据。
拖入「SQL脚本」节点,重命名为恢复旧数据,并于「标记旧数据」节点相连,执行判断为失败时执行。
注:连线执行判断详细介绍请参见:连线执行判断
「S目标表」中,「订购日期」为近 30 天且标记为 1 的数据标记为 0 。如下图所示:
update S目标表 set biaoji= 0 where biaoji=1 and date(订购日期) >= DATE_SUB(curdate(),INTERVAL 30 day)
2.4 同步数据
将「订单信息」中的数据同步到「S目标表」中。
1)添加「数据同步」节点,并于「标记旧数据」节点相连。如下图所示:
2)进入「数据同步」节点,添加「DB表输入」算子和「DB表输出」算子,两个算子相连。
「DB 表输入」节点取出近 30 天数据:
select * from 订单信息 where date_sub(curdate(),interval 30 day) <= 订购日期
「DB表输出」算子将取出数据同步到「S目标表」中:
2.5 数据同步失败
若「数据同步」节点执行到一半失败,需要删除新增数据,并恢复旧数据。
2.5.1 删除新增数据
添加「SQL脚本」节点,重命名为删除新增数据,与「数据同步」节点相连,执行判断为失败时执行
将「S目标表」中标记为 0 且「订购日期」为近 30 天的数据删除。如下图所示:
DELETE FROM S目标表 where biaoji=0 and 订购日期 >= DATE_SUB(curdate(),INTERVAL 30 day)
2.5.2 恢复旧数据
添加「SQL脚本」节点,重命名为恢复旧数据1,与「删除新增数据」节点相连。
将「S目标表」中标记为 1 且「订购日期」为近 30 天的数据标记为 0。如下图所示:
update S目标表 set biaoji=0 where biaoji=1 and 订购日期 >= DATE_SUB(curdate(),INTERVAL 30 day)
2.5 数据同步成功
数据同步成功,需要删除「订购日期」为近 30 天的旧数据。
添加「SQL脚本」节点,重命名为删除旧数据,与「数据同步」节点相连。
删除「订购日期」为近 30 天的旧数据:
2.6 运行任务
点击右上角「保存并运行」按钮,可执行任务。