1. 概述
1.1 应用场景
在数据同步时,当历史数据量很大,为保证数据的时效性,需要对数据定时进行增量更新。
在增量更新过程中,若遇到字段异常值、脏数据等问题,将导致数据同步一段时间后任务失败。此时目标表中已同步了部分数据,需要将目标表进行数据回滚到此次增量更新前的数据。
1.2 实现思路
目标表中存在标识字段,标识字段有默认值,目标表中存在逻辑主键。
最新同步到目标表中的数据标识值为空,通过标识字段是否有值判断数据的新旧:
若数据同步一段时间后失败,目标表中删除标识值为空的数据即可。
数据同步成功后,若存在数据更新,会有两条逻辑主键相同的数据,删除标识值为默认值的那条数据即可;随后,将标识值为空的数据,把标识字段赋值为默认值。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「数据抽取失败后进行数据回滚-」。
2. 示例
2.1 场景模拟
需要将「订单信息」表中「订购日期」为近 30 天的数据同步到「S目标表」中,实现数据的新增和更新。若数据同步一段时间后失败,进行数据回滚,将目标表恢复到同步前的数据。
本次示例任务第一次执行时,时间为 2023-03-20 。
2.2 方案说明
「S目标表」中存在标记列,字段名为 flag ,值为 0 。数据同步时,写入方式选择「直接将数据写入目标表」,这样目标表中可以存在两条「订单ID」相同的数据:
注:「订单ID」不是物理主键,为逻辑主键。
1)若数据同步一段时间后失败,同步到目标表中的部分数据 flag 列值为空,此时删除 flag 列值为空的数据实现数据回滚。
2)若数据同步成功:
更新的数据同步后,目标表中会存在两条「订单ID」相同的数据,旧数据 flag 列值为 0 ,新同步的数据 flag 列值为空,此时删除 flag 列值为 0 的那条订单ID数据即可。
此时目标数据库中存在新增的数据和更新时保留的最新的数据,他们的 flag 列值为空,为便于下次任务执行,把 flag 列值赋值为 0 。
2.3 数据同步
1)新建定时任务后,添加一个「数据同步」节点。
2)从「订单信息」表中筛选出「订购日期」为近 30 天的数据。如下图所示:
select * from `demotest`.`订单信息` where date_sub(curdate(),interval 30 day) <= 订购日期
3)将数据同步到目标表中。由于想实现数据的更新和数据的回滚,写入方式选择「直接将数据写入目标表」,这样目标表中可存在两条「订单ID」相同的数据,便于后续进行数据的更新和回滚(详情参见后续步骤)。
如下图所示:
2.4 数据同步失败
若数据同步一段时间后失败,需要删除已同步到目标表的数据。
由于新同步进目标表的数据,flag 列的值为空(旧数据 flag 列的值为 0),所以删除 flag 列为空的数据即可。
新增「SQL脚本」节点,重命名为数据回滚,与「数据同步」节点相连。如下图所示:
DELETE FROM `demotest`.`S目标表` where flag is null
两个节点连线执行判断为失败时执行,如下图所示:
2.5 数据同步成功
若数据同步成功,则需要删除旧数据,并将新数据的标记值设为 0 。
2.5.1 删除旧数据
筛选「订单信息」表中「订购日期」为近 30 天的数据后,若数据中的「订单ID」在「S目标表」中也存在,数据同步成功后,目标表中存在两条「订单ID」相同的数据,需要把旧数据删除掉。
新增「SQL脚本」节点,重命名为删除旧数据,删除旧数据。如下图所示:
delete from `demotest`.`S目标表` where flag = 0 and 订单ID
in (
select a.订单ID from
(select 订单ID from `demotest`.`S目标表` where flag is null) a)
2.5.2 将新同步的数据进行标记
同步到目标表中的数据,此时 flag 列的值为空,将 flag 列的值改为 0 ,以便下次定时任务执行。
新增「SQL脚本」节点,重命名为标记新数据,标记同步到目标表中的数据。如下图所示:
update `demotest`.`S目标表` set flag=0 where flag is null
2.6 设置定时执行
发布定时任务到「生产模式」,点击「调度计划>定时调度>添加单个调度」,设置任务每隔 30 天执行一次。如下图所示:
2.7 运行任务
点击左上角「运行」按钮,即可执行任务。