历史版本2 :管道任务示例 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

本文以 MySQL 数据同步为例,将 demotest 数据库中的订单信息库存信息成本中心对照表」实时同步至 demo1 数据库中。

2. 操作步骤编辑

2.1 准备工作

需准备一个独立部署的 FDL 工程,该工程已注册 数据管道相关功能点 

步骤
步骤一:数据源配置

根据实际情况选择源端和目标端数据库,数据管道支持的数据库请参见:数据管道支持的数据源类型

在数据连接管理中配置源端和目标端的数据连接,便于在管道任务配置过程中,通过选择数据源名称来控制同步任务的读取和写入数据库。详情请参见:配置数据连接

步骤二:准备数据库环境

基于需要设置数据管道任务的数据源,授予数据源配置的账号在数据库进行相应操作的权限。详情请参见:数据库环境准备概述

步骤三:管道任务环境准备

部署 Kafka 开源流处理平台作为中间件。详情请参见:部署Kafka配置传输队列

步骤四:分配管道任务权限

若需要使用数据管道的用户不是超级管理员,则需要为对应用户分配数据管道的使用权限。详情请参见:管道任务管理权限

2.2 新建管道任务

进入 FDL 工程,点击「数据管道」,新建一个管道任务。如下图所示:

6.png

2.3 配置管道任务

2.3.1 选择数据来源

数据源选择 fdl_demotest 数据连接后,点击右侧的「数据源权限检测」按钮,可以查看数据源是否有日志读取权限。

同步类型选择「存量+增量同步」,先对所有存量数据同步,然后持续同步新增变化。

同步对象选择 demotest 数据库中的订单信息库存信息成本中心对照表」。如下图所示:

更多设置项介绍请参见:配置管道任务-选择数据来源

7.png

2.3.2 选择去向

更多设置项介绍请参见:配置管道任务-选择数据去向

1)如下图所示:

8.png


步骤
含义
「库」设置为demo1数据实时同步到 demo1 数据库中
源端删除数据,选择目标端执行逻辑删除来源表删除数据,目标端不实际删除数据;目标表将新增一个名称为_fdl_marked_deleted的布尔型字段(字段默认为false),用于记录数据删除状态。来源数据表删除了一条数据,此时同步至目标表,目标表不进行物理删除,而是将_fdl_marked_deleted字段更新为 true 
开启同步时标记时间戳按钮所有目标表将新增一个名称为_fdl_update_timestamp的长整型字段,以毫秒级时间戳的形式记录数据在数据库中实际新增和更新的时间(数据库所在时间
开启同步源表结构变化按钮数据管道任务支持同步源库DDL功能,开启该按钮后,在源库发生DDL(删除表、新增字段、删除字段、修改字段名称、修改字段类型(长度修改 & 兼容类型修改))时,管道任务可以自动同步这些来源端变化至目标端,不需人为介入修改目标表结构

2)点击下一步


2.3.3 表字段映射

1)我们发现,目标表中表名称标红,这是因为目标表名称中不能包含中文、特殊字符。

依次为待同步的三张表:修改目标表名称,为目标表设置逻辑主键,如下图所示:

9.png

2.3.2 节步骤中,我们开启了同步时标记时间戳按钮、目标端执行逻辑删除」,目标表中新增_fdl_update_timestamp、_fdl_marked_deleted字段。如下图所示:

1705994117614456.png

本步骤中,目标表可以选择已存在表、可批量修改表名、自动识别来源表的物理主键等,详情请参见:配置管道任务-表字段映射

2)点击下一步


2.3.4 管道控制

如下图所示:

1705994942431550.png

更多设置项介绍请参见:配置管道任务-管道控制

步骤
含义
脏数据阈值为 1000 行

某次任务运行期间脏数据达到1000条后,任务会异常中

注1:限制最多10w行,且重启任务后,会重置阈值统计。

注2:若需要对产生的脏数据进行处理,详情参见:脏数据处理

失败重试:重跑 3 次,每次间隔 2 分钟管道任务失败后,重试 3 次,每次间隔 2 分钟
结果通知

将源表结构变化、异常或脏数据导致的任务终止信息以邮件形式通知用

日志等级设置INFO 为普通信息,用于记录运行状态或重要事件,可打印详细的日志,供用户查看

2.4 效果查看

1)可查看读取行数、写入行数。如下图所示:

12.png

可看到「库存信息」表同步时产生 1 条脏数据,可参考 文档 对脏数据进行处理。

2)demo1 数据库中可查看到已同步的 3 张表。如下图所示:

1705995240991026.png

3)demotest 数据库中的成本中心对照表」中:

  • 删除 collid 为 1.01.02 的数据。

  • collid 为 1.01.03 的数据,fname 改为 产品研发部。

  • 新增 test 字段。

demo1 数据库中的 cost 表如下图所示: