1. 概述
1.1 应用场景
用户从吉客云读取一张 1300W 数据量的表存储到阿里云的 ApsaraDB for MySQL。
「数据转换」节点中处理数据时,由于数据量过大,数据读取的时间超过了数据库允许 SQL 执行的最大时间(最大时间为 30 分钟);面对如此大的数据量,单纯的增加数据库查询等待时间解决不了问题,并且单并发的数据同步也满足不了数据同步的时间要求。
1.2 实现思路
具体实现步骤分为两个阶段:
初始化全量阶段
根据日期字段将数据分区:
以取出 2024-01 月数据为例:
1)每次取出 100000,计算取出次数,并输出为参数 fenye。
2)循环容器执行条件设置为 ${loopTimes}<${fenye}:
分页查询来源表中 2024 年 1 月的数据,每页查询最多 100000 条记录,并根据 ${loopTimes} 变量动态调整查询的起始位置,实现分页效果。
数据取出后,用户可根据实际情况,进行JSON解析、过滤等操作。
将处理后的数据落库。
增量阶段
1)根据目标表最大时间筛选出来源表新增数据,新增的数据按每页 500 条计算页数,将页数输出为参数 fenye。
2)循环容器执行条件设置为 ${loopTimes}<${fenye}:
获取每批 500 条的主键并作为参数输出。
根据主键参数,删除目标表中历史数据。
将本批新增数据处理后落库。
1.3 注意事项
本文方案中的 SQL 语句只适用于 ApsaraDB for MySQL 数据库,其他数据库需要自行调整 SQL 语句。
本文方案只适用于大数据量场景&数据只存在新增的场景。
2. 初始化全量阶段
根据日期字段对数据进行分区,取出每月的数据方案类似,本文只详细说明取出 2024-01 月份数据的步骤。
2.1 按照分区字段查询并统计循环次数
1)新建定时任务,拖入「参数赋值」节点,筛选出:2024-01 月时间段内,来源表中的记录按照每页 100000 条记录分页时,需要多少页。如下图所示:
SELECT ceil(count(*)/100000) as fenye FROM `jky`.`qm_trades_fullinfo` where tradeTime like "%2024-01-%"
点击「数据预览」按钮,如下图所示:
含义:如果后续查询 2024-01 月时间段内,且每次取出 100000 条记录,需要取出 22 次。
2)将 fenye 字段输出为参数 fenye。如下图所示:
2.2 循环容器设置
拖入「循环容器」节点,循环方式选择「条件循环」,执行条件为:${loopTimes}<${fenye}。如下图所示:
注:${loopTimes} 为循环容器内当前循环次数,初始值为1。具体说明请参见:${loopTimes}
2.3 取数并处理数据
1)「循环容器」节点中拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,读取数据。
本步骤目的:分页查询来源表中 2024 年 1 月的数据,每页查询最多 100000 条记录,并根据 ${loopTimes} 变量动态调整查询的起始位置,实现分页效果。
select tradeTime,goodsDetail from `jky`.`qm_trades_fullinfo`
where tradeTime like "%2024-01-%" limit 100000 offset 100000*(${loopTimes}-1)
3)数据取出后,用户可根据实际情况,进行JSON解析、过滤等操作,本文不再展示具体处理过程。
4)最后拖入「DB表输出」算子,将处理后的数据输出到目标表中。如下图所示:
5)运行任务,可取出 2024-01 时间内的数据。
2.4 后续步骤
参考本文 2.1-2.3 节内容,类似步骤取出 2024-02、2024-03 等月份下的数据,并都落库到相同目标表中即可。
3. 数据增量阶段
3.1 获取来源表新增数据条数并按照每页500条计算页数
1)拖入「参数赋值」节点,根据目标表最大时间筛选出来源表新增数据,新增的数据按每页 500 条计算页数。如下图所示:
select ceil(count(__dm_key)/500) as a from `jky`.`qm_trades_fullinfo`
where tradeTime >=(select max(tradeTime) from `jky`.`fanruan_test1`)
2)将数据输出为参数 fenye。如下图所示:
3.2 循环容器设置
拖入「循环容器」节点,循环方式选择「条件循环」,执行条件为:${loopTimes}<${fenye}。如下图所示:
注:${loopTimes} 为循环容器内当前循环次数,初始值为1。具体说明请参见:${loopTimes}
3.3 获取每批500条的主键并作为参数输出
1)「循环容器」节点内拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,获取每批 500 的主键。如下图所示:
select __dm_key from `jky`.`qm_trades_fullinfo`
where tradeTime >=(select max(tradeTime) from `jky`.`fanruan_test1`) limit 500 offset 500*(${loopTimes}-1)
3)拖入「参数输出」算子,将获取的主键字段输出为参数。如下图所示:
3.4 根据主键删除目标表历史数据
拖入「SQL脚本」,删除目标表中主键字段为 3.3 节输出参数值的数据。如下图所示:
delete from `jky`.`fanruan_test1` where __dm_key in (${dm_key})
3.5 将新增数据落库
1)拖入「数据转换」节点,进入「数据转换」节点。拖入「DB表输入」算子,获取每批 500 条数据。
select tradeTime,goodsDetail,__dm_key from `jky`.`qm_trades_fullinfo`
where tradeTime >=(select max(tradeTime) from `jky`.`fanruan_test1`) limit 500 offset 500*(${loopTimes}-1)
2)数据取出后,用户可根据实际情况,进行JSON解析、过滤等操作,本文不再展示具体处理过程。
3)最后拖入「DB表输出」算子,将处理后的数据输出到目标表中。如下图所示: