1. 概述
1.1 问题描述
用户在定时任务进行数据同步时,如果源数据库同时有增删改变化,且来源表有时间戳时,可以使用数据比对,但是如果数据量很大,想要更好的提高同步效率,就可以使用本文的方案进行数据同步。
注:时间戳一般是记录数据的增删改时间。
1.2 实现思路
通过两次数据更新的时间进行增删改的同步。
1)同步删除数据:通过比对主键,删除来源表中被物理删除但是目标表中还存在的数据。
2)使用时间戳同步新增和修改的数据:
获取上次同步的时候,来源表的最大时间戳,并设置为参数 cur_timestamp。
获取这次同步的时候,来源表的最大时间戳,并设置为参数 tmp_timestamp。
获取来源表相对目标表的增量数据,即 order_timestamp(来源表业务时间戳)>cur_timestamp 的数据,并将增加和更新的数据同步到目标表。
3)更新时间表同步时间戳和执行记录。
demo 示例详情请参见:https://demo.finedatalink.com/ 增删改数据同步(非数据比对且有时间戳)
2. 操作步骤
例如来源表中删除 1002 的数据,同时修改了 1003 的 order_quantiny 的数据,同时新增了 1005 的数据,需要将变化同步至目标表。
其中 order_business_date 为业务时间,也就是订单交易时间,order_timestamp 为时间戳,即记录数据增删改的时间。
2.1 前提条件
创建执行记录数据表,便于获取每次同步的时间戳,如下图所示:
注:两个时间字段初始值可以设置为一个比较早的时间,早于记录数据更新时间戳,以便初次同步全量数据。
2.2 同步来源表中删除的数据
通过比对主键,删除来源表中被物理删除但是目标表中还存在的数据。
创建定时任务后,使用「数据转换」节点,进入编辑界面,使用DB表输入功能,获取来源表中主键 order_id_from,如下图所示:
SELECT order_id AS order_id_from
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
使用DB输出算子,获取目标数据表的主键,如下图所示:
SELECT order_id AS order_id_target
FROM `demotest`.`F_ORDER_TIMESTAMP_TARGET`
使用「数据比对」算子,比对来源表和目标表的 order_id 主键,获取来源表删除但是目标表还存在的数据,如下图所示:
查看比对结果即可看到来源端删除的数据 1002 被标识出来,如下图所示:
使用 DB 输出算子将比对结果源表删除的数据从目标表中删除,设置写入方式为「插入更新删除数据」,只勾选「删除」,逻辑主键选择「order_id」,如下图所示:
2.3 使用时间戳同步新增和修改的数据
使用「参数赋值」获取上次同步的时候,来源表记录数据增删改的最大时间戳,如下图所示:
SELECT cur_timestamp
FROM `demotest`.`ETL_RECORD`
WHERE table_name='F_ORDER_TIMESTAMP_FROM'
并设置为参数 cur_timestamp,如下图所示:
获取这次同步的时候,来源表的最大时间戳,如下图所示:
SELECT MAX(order_timestamp) AS tmp_timestamp
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
并设置为参数 tmp_timestamp,如下图所示:
获取来源表相对目标表的增量数据,即 order_timestamp(源表记录数据增删改的时间戳)>cur_timestamp 的数据,并将增量数据同步到目标表。
新增数据转换节点,进入编辑界面,使用DB输入算子,取出增量数据并将当前的时间作为 etl_time,便于后续写入 2.1 节的记录数据表,如下图所示:
SELECT
*,
NOW() AS etl_time
FROM `demotest`.`F_ORDER_TIMESTAMP_FROM`
WHERE order_timestamp>'${cur_timestamp}'
将 order_timestamp(来源表业务时间戳)>cur_timestamp 的增加和更新的数据同步到目标表,如下图所示:
2.4 更新时间表同步时间戳和执行记录
为了便于下一次使执行任务时使用时间戳,因此需要将本次同步时的最大时间戳写入 2.1 节创建的执行记录表中,同时将上次同步记录时间写入 pre_timestamp 字段,etl执行次数写入 etl_time,如下图所示:如下图所示:
UPDATE `demotest`.`ETL_RECORD`
SET
cur_timestamp='${tmp_timestamp}',
pre_timestamp='${cur_timestamp}',
etl_times=etl_times+1,
etl_last_time=NOW()
WHERE table_name='F_ORDER_TIMESTAMP_FROM'
2.5 效果查看
保存并运行任务后,即可在数据库中看到增删改数据同步到目标表,如下图所示:
同时记录表 ETL_RECORD 结果如下图所示: