1. 概述编辑
1.1 应用场景
业务系统随着上线时间的加长,产生的系统数据越来越多。对于一些不再使用但又不能删除的数据(比如医院的患者档案),或者不会再大量修改只会进行查询的数据,用户希望把这些数据存放到归档库中,减轻业务系统的压力。
业务系统中只保留最近三年数据(活跃的数据),性能得到提升。
1.2 实现思路
下图中的序号代表执行的先后顺序:
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ :
1)管道任务:
2)定时任务:「场景案例>数据开发场景>数据归档」。
2. 场景模拟编辑
业务系统中每天会新增大量数据,对于不再使用的旧数据,希望保存到归档库中,且业务数据库保留最新三年的数据。
数据库 | 表信息 |
---|---|
业务数据库 test_1 | |
中间库 test_2 | 将 test_1 中的两张表分别重命名为:order_information、stock 「order_information」表中,「订单ID」为主键;「stock」表中,「ID」为主键 |
归档库 test_3 | 「order_information」表中,需要将「订单ID」设为主键;「stock」表中,需要将「ID」设为主键 |
3. 业务数据库实时同步到中间库编辑
本节内容执行步骤一,将业务数据库中表信息实时同步到中间库中。
3.1 准备工作
数据管道详细介绍请参见:配置数据管道任务
1)数据管道支持的数据源详情参见:数据管道支持的数据源
2)前提条件请参见:使用数据管道的前提条件
3)配置传输队列,其实就是配置暂存来源库数据,方便目标库写入数据的「数据管道」,目前是通过 Kafka 实现的。
详情参见:配置传输队列
3.2 选择数据源
首先选择需要数据同步的来源数据。
点击「数据管道>新建任务」,进入任务设置界面,选择来源数据库以及需要进行数据同步的数据表,默认读取方式为 Binlog,默认先对所有存量数据同步,然后持续同步新增变化。
将已经存在的「客户订单信息」表、「库存信息」表选到「要同步的表」中。
如下图所示:
3.3 选择数据去向
由于同步后的数据要存放到归档库中,为方便以后在归档库中查询历史数据,数据管道同步时,执行逻辑删除(在目标表追加标记字段,用于记录数据删除状态);并且在目标表追加时间戳字段,用于记录数据新增或更新的时间。
点击「下一步」进入「选择去向」界面,选定目标数据库。勾选「目标端执行逻辑删除」,开启「同步时标记时间戳」、「同步源表结构变化」按钮。如下图所示:
3.4 设置表字段映射
1)点击「下一步」进入「字段映射」设置界面。
2)本文示例目标表为新建,来源表「客户订单信息」同步到目标表「order_information」表中,目标表中设置「订单ID」字段为主键;来源表「库存信息」同步到目标表「stock」表中,目标表中设置「ID」字段为主键。映射方式都选择「同名映射」。如下图所示:
在此界面,可看到目标表中自动生成字段_fdl_marked_deleted(用于记录数据删除状态)、_fdl_update_timestamp(毫秒级时间戳的形式记录数据在数据库中实际新增和更新的时间)。如下图所示:
3.5 设置管道控制
1)点击「下一步」进行数据管道的任务设置。
2)数据同步允许一定的容错,比如字段类型、长度不匹配、主键冲突等等问题,可以设置产生的脏数据上限,达到上限则自动终止管道任务。勾选「源表结构变化」按钮,同时设置当任务异常时的通知。如下图所示:
3.6 效果查看
点击右上角的「保存并启动」按钮,执行管道任务。任务执行成功后,业务数据库表信息已经同步到中间库中。如下图所示:
4. 中间库通过FDL同步到归档库且删除业务库三年前数据编辑
业务数据库 test_1 中的数据通过数据管道实时同步到中间库 test_2 后,需要把中间库 test_2 中的数据定时同步到归档库 test_3 中,进行数据归档,然后删除业务数据库三年前的数据。
先执行步骤二,再执行步骤三:
1)新建定时任务,新增「数据同步」节点,筛选出中间库 stock 表前一天的数据 。如下图所示:
SELECT * FROM stock
WHERE FROM_UNIXTIME(_fdl_update_timestamp/1000,'%Y-%m-%d') = date_sub(curdate(),interval 1 day)
2)将筛选出的数据同步到归档库 test_3 的 stock 表中,写入方式选择「追加写入数据」,当主键冲突时,覆盖目标表数据,实现数据的更新。如下图所示:
3)增加一个「SQL脚本」节点,与「数据同步」节点相连,删除业务库中「库存信息」表三年前的数据。如下图所示:
DELETE FROM 库存信息 WHERE 日期 < DATE_SUB(NOW(), INTERVAL 3 YEAR)
4)筛选出中间库 order_information 表前一天的数据,同步到归档库 test_3 的 order_information 表中。同步时,写入方式选择「追加写入数据」,当主键冲突时,覆盖目标表数据,实现数据的更新。如下图所示:
SELECT * FROM order_information
WHERE FROM_UNIXTIME(_fdl_update_timestamp/1000,'%Y-%m-%d') = date_sub(curdate(),interval 1 day)
5)增加一个「SQL脚本」节点,与前一个「数据同步」节点相连,删除业务库中 order_information 表三年前的数据。如下图所示:
DELETE FROM 客户订单信息 WHERE 订购日期 < DATE_SUB(NOW(), INTERVAL 3 YEAR)
6)点击「调度配置」,设置任务每天 0 点执行。如下图所示:
7)点击右上角「保存并运行」按钮。
5. 结果展示编辑
定时任务执行后:
1)中间库的前一天数据同步到归档库中。如下图所示:
2)业务库中的「客户订单信息」表、「库存信息」表已删除三年前数据。如下图所示:
「库存信息」表删除前后最小日期对比:
「客户订单信息」表删除前后最小日期对比:
3)业务库删除数据后,中间库 test_2 中,stock 和 order_information 表中,3 年前数据 _fdl_marked_deleted 字段的值变为 true ,_fdl_update_timestamp 字段值更新。如下图所示: