历史版本10 :数据抽取失败后进行数据回滚 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 应用场景

在数据同步时,当历史数据量很大,为保证数据的时效性,需要对数据定时进行增量更新。

在增量更新过程中,若遇到字段异常值、脏数据等问题,将导致数据同步一段时间后任务失败。此时目标表中已同步了部分数据,需要将目标表进行数据回滚到此次增量更新前的数据。

1.2 实现思路

「S目标表」中存在标记列,字段名为 flag ,值为 0 。数据同步时,写入方式选择「追加写入数据」,这样目标表中可以存在两条「订单ID」相同的数据:

注:「订单ID」不是物理主键,为逻辑主键。

1)若数据同步一段时间后失败,同步到目标表中的部分数据 flag 列值为空,此时删除 flag 列值为空的数据实现数据回滚。

2)若数据同步成功:

  • 更新的数据同步后,目标表中会存在两条「订单ID」相同的数据,旧数据 flag 列值为 0 ,新同步的数据 flag 列值为空,此时删除 flag 列值为 0 的那条订单ID数据即可。

  • 此时目标数据库中存在新增的数据和更新时保留的最新的数据,他们的 flag 列值为空,为便于下次任务执行,把 flag 列值赋值为 0 。

1.3 任务展示

FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「场景案例>数据仓库场景>数据抽取失败后进行数据回滚」。

31.png

2. 示例编辑

2.1 场景模拟

示例数据表:S目标表.xls订单信息.xls

需要将「订单信息」表中「订购日期」为近 30 天的数据同步到「S目标表」中,实现数据的新增和更新。若数据同步一段时间后失败,进行数据回滚,将目标表恢复到同步前的数据。

本次示例任务第一次执行时,时间为 2023-03-20 。

2.2 数据同步

1)新建 ETL 任务后,添加一个「数据同步」节点。

2)从「订单信息」表中筛选出「订购日期」为近 30 天的数据。如下图所示:

select * from 订单信息 where date_sub(curdate(),interval 30 day) <= 订购日期

25.png

3)将数据同步到目标表中。由于想实现数据的更新和数据的回滚,写入方式选择「追加写入数据」,这样目标表中可存在两条「订单ID」相同的数据,便于后续进行数据的更新和回滚(详情参见后续步骤)。

如下图所示:

26.png

2.3 数据同步失败

若数据同步一段时间后失败,需要删除已同步到目标表的数据。

由于新同步进目标表的数据,flag 列的值为空(旧数据 flag 列的值为 0),所以删除 flag 列为空的数据即可。

新增「SQL脚本」节点,重命名为数据回滚,与「数据同步」节点相连,连线执行判断为失败时执行。如下图所示:

DELETE FROM S目标表 where flag is null

32.png

2.4 数据同步成功

若数据同步成功,则需要删除旧数据,并将新数据的标记值设为 0 。

2.4.1 删除旧数据

筛选「订单信息」表中「订购日期」为近 30 天的数据后,若数据中的「订单ID」在「S目标表」中也存在,数据同步成功后,目标表中存在两条「订单ID」相同的数据,需要把旧数据删除掉。

新增「SQL脚本」节点,重命名为删除旧数据,删除旧数据。如下图所示:

delete from S目标表 where flag = 0 and 订单ID 
in ( 
select a.订单ID from 
(select 订单ID from S目标表 where flag is null) a)

33.png

2.4.2 将新同步的数据进行标记

同步到目标表中的数据,此时 flag 列的值为空,将 flag 列的值改为 0 ,以便下次定时任务执行。

新增「SQL脚本」节点,重命名为标记新数据,标记同步到目标表中的数据。如下图所示:

update S目标表 set flag=0 where flag is null and 订购日期 >= DATE_SUB(curdate(),INTERVAL 30 day)

34.png

2.5 设置定时执行

设置任务 30 天执行一次。如下图所示:

1679300993524348.png

2.6 运行任务

点击右上角「保存并运行」按钮,可执行任务。