历史版本12 :单个管道任务管理 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 版本说明

FineDataLink 版本
功能变动
4.0.27
  • 管道任务提示优化

  • 支持快速批量选择需要同步的数据表

  • 支持文件夹管理管道任务

  • 脏数据处理优化,支持对单表和多表生成的脏数据进行忽略、重试或者重新同步。

  • 任务日志提示优化

4.1.2
  • 支持自定义历史统计周期查看同步情况,详情参见本文 3.1 节

  • 运行中的任务支持查看任务配置情况,详情参见本文第二章

  • 新增显示待同步写入延迟时间,并能查看同步数据表「最新读取消息时间」和「最新写入消息时间」,详情参见本文 3.1.1 节

  • 脏数据展示优化,详情参见本文 3.2 节

  • 脏数据处理优化,详情参见本文 3.3 节

1.2 功能说明

在设置好管道任务后,FineDataLink 支持对数据实时同步进行监控,查看任务运行情况,快速查看和处理脏数据。

2. 任务运行状况编辑

点击任务列表中的任务,可以看到任务是否在运行中。同时可以手动停止任务,或者是进入任务编辑界面,如下图所示:

任务运行状况展示内容
运行状态
任务最近一次启动时间

来源端数据读取时间

目标端数据读取时间

来源端数据类型

目标端数据类型

启动/暂停任务
进入任务编辑界面
对于运行中、暂停或者终止、草稿、待启动各个状态的任务,可点击「编辑」按钮,修改或者查看任管道任务的配置项,能修改的内容如下图表所示:

功能
运行暂停&中止草稿待启动
选择来源(配置页)数据来源类型查看查看查看&编辑查看&编辑
数据来源数据连接查看查看&编辑查看&编辑
数据源权限检测使用使用使用
读取方式查看查看&编辑查看&编辑
同步类型

不允许切换存量&仅增量。

选择仅增量时,允许调整增量起点。

同步对象

允许增减同步对象。

1)新增同步对象:新增的表将按照所选的同步类型进行同步。

  • 同步类型为存量+增量:新增的表需要进行存量同步,那么增量同步将在后台挂起,等新增的表存量跑完,再继续开始增量

  • 同步类型为仅增量:

    • 修改了增量起点:所有表(包含新增的表)按照指定增量起点进行同步

    • 没有修改增量起点:新增表,按照任务内置的断点进行同步

2)删除同步对象:删除同步对象并保存时,将同时删除此对象相关的所有关联信息,任务启动时,将不再同步对应表。

选择去向(配置页)数据去向类型查看
数据去向数据连接
模式
源端删除同步方式
时间戳同步
同步表结构变化(DDL)

查看&编辑

支持调整DDL同步开关



表字段映射(配置页)字段映射

新增的表可以配置字段映射。

已有同步中的表仅可查看字段映射,不可编辑调整。


管道控制(配置页)脏数据阈值

查看&编辑


失败重试查看&编辑
结果通知查看&编辑

3. 统计日志编辑

3.1 时间维度查看实时同步情况

点击「管理活动」即可看到当前任务的历史同步趋势。

3.1.1 实时统计

点击实时统计,即可查看任务历史读取总量,即任务从首次运行开始一共读取、输出的数据总量、待同步数据量、读取和输出数据的实时速率。

注:若使用了重新同步,单表的读取和写入会被重新计算,「重新同步的数据量」不会累加进「实时统计」的读取总量和输出总量里,但是会累加进「历史统计」中。

此时待同步下可查看全任务、单个数据表的待同步写入延迟时间。

写入延迟时间=最新一条写入目标端的消息对应的时间-最新一条源端日志消息对应时间

3.1.2 历史统计

在 4.1.2 及以上版本,点击「历史统计」,用户可以选择近2小时、近24小时、近3天、近7天、近15天等等各种时间段的实时同步详情。

展示同步读取数据总和待同步行输出总量、以及读取和输出数据的实时速率(行/s)),如下图所示:

2023-11-27_14-50-40.gif

同时也可以自定义筛时间区间,查看选定区间的数据同步情况,如下图所示:

3.2 数据表维度查看同步情况

同步对象模块下可查看到所有来源表的同步情况,如下图所示:


功能说明
A来源数据表数据表名
B数据表读写时间

「实时统计」显示

最新读取:最新读取源表数据时间

消息时间:最新一条源端日志消息对应时间

最新写入:最新写入目标表数据时间

消息时间:最新一条写入目标端的消息对应的时间

C同步状态表当前同步阶段:增量同步、存量同步
D脏数据
  • 显示脏数据条数

  • 点击脏数据,即可根据各错误原因分类、时间、关键字等进行脏数据筛选,查看脏数据详情

  • 点击「导出脏数据明细」即可将脏数据导出

注:对于主键字段值由A更新为B的数据,当主键更新事件失败时,拆解为两条脏数据记录:主键为A的数据删除、主键为B的数据upsert;

导出时,按照字段映射配置中的目标端表结构导出;

E待同步量待同步数据量
F读写统计

详细描述数据插入、更新、删除条数

显示读写速度 

注:仅可在「历史统计」中查看

G查看日志查看单表任务运行日志
H重新同步支持对单表和整个管道任务进行重新全量同步,会将目标端表清空并重新执行全量同步、在全量同步结束后转入增量同步;

在任务被开启重新全量同步后,日志统计信息会重置(输入行、输出行);

注:若开启逻辑删除标识时,重新同步对目标表清表重写,重写采用insert逻辑,出现提示“任务运行期间产生的逻辑删除数据将会被清空,请确认”。

用户点击脏数据,即可筛选出有脏数据的来源表,然后点击指定的数据表,即可看到详细的脏数据错误原因,更便于进行后续的脏数据处理,错误分类和原因如下表所示:

错误分类
错误原因
详细错误信息
字段的数据类型不匹配<column_name>字段的数据类型不匹配一个或多个字段<column_name>的预期数据类型和实际数据类型不匹配,或者与过去收到的数据类型不同,请更新目标表的字段类型。
如:在布尔字段中找到字符串值 ,或不可为空字段中的空值。
数据的长度超出字段长度数据的长度超出<column_name>字段长度数据的大小大于列<column_name>的大小,请更新目标表的字段类型。
如:在VARCHAR(255)字段中找到长度为1000的字符串。
目标字段不存在列<column_name>不存在,请创建相关字段在字段映射中指定的一个或多个目标列<column_name>在目标表中不存在,请创建相关字段。
目标表不存在目标表<table_name>不存在在字段映射中指定的目标表<table_name>不存在,请创建相关表。
数据去向不可用数据去向<data_connection_name>连接失败数据去向<data_connection_name>连接失败,请检查网络是否连通、账号密码是否正确、账号是否有权限等。
写入时权限不足目标表<table_name>无写入权限目标表<table_name>无写入权限,请调整相关权限后重试
全量同步阶段出现错误全量同步阶段出现错误具体错误详情
其他错误其他堆栈详情

3.3 脏数据处理

3.3.1 单条脏数据处理

若需要对单条脏数据进行处理,则直接在选择指定数据表的脏数据,然后进行重试或者忽略操作即可,如下图所示:

3.3.2 脏数据批量处理

若需要对当前任务中全量脏数据或者是指定单表的脏数据进行批量处理,则选择指定的来源库数据表,可勾选任务中的单表或者多表,进行脏数据的忽略、重试或者重新同步,如下图所示:

处理方式说明
忽略脏数据

对单表和指定多表,忽略功能会将缓存的脏数据进行删除,且删除后不可找回;

同时这部分数据的行数从日志统计的脏数据行数去掉;

重试脏数据

对单表和指定多表,重试功能会将缓存的脏数据进行再次提交,并更新数据量统计;

注:存量同步阶段产生的脏数据不支持重试。

重新同步支持对单表和整个管道任务进行重新全量同步,会将目标端表清空并重新执行全量同步、在全量同步结束后转入增量同步;

在任务被开启重新全量同步后,日志统计信息会重置(输入行、输出行);

注:若开启逻辑删除标识时,重新同步对目标表清表重写,重写采用insert逻辑,出现提示“任务运行期间产生的逻辑删除数据将会被清空,请确认”。

注1:重新同步需要在任务运行期间使用,若任务为运行状态,任务中存在没有全量完成的表时,无法使用重新同步。忽略和重试(在任务运行、暂停、中止时均可进行)。

注2:当数据源是kafka ,不支持重新同步。

注3:若任务为暂停或者中止状态,重新同步后,需要在下次启动时才会真正执行重新同步的逻辑。

脏数据事件记录逻辑:

  • 若在t1时间点,主键为A的数据产生了脏数据,则在t1的脏数据未被处理前、t1以后的关于主键A的所有数据均计入脏数据队列,原因也记录为与t1的脏数据一致的原因;

  • 若在t1时间点,主键为A的数据被更新为主键为B,则管道应拆解为两个事件进行顺序处理至目标端:Delete A,Insert B;

注意事项:

输出端为批量装载模式的制约

当输出端为批量装载模式时,此时一般会使用一个包括很多数据条数的大批次进行一次性提交数据到目标端,这类数据源可能无法识别到过程中哪些数据出现了脏数据,目前已有的支持批量装载的目标端具体分析如下:

1)GaussDB 200 支持两种写入模式,分别为copy和并行装载:

copy模式:

  • 在全量阶段,当单批次提交失败后,会将这个批次的数据改为调用JDBC接口进行写入(目前单个批次1024条),JDBC写入时能获取到具体错误数据,进而实现脏数据明细展示,但是JDBC方式性能不佳;

  • 在增量阶段,当单批次提交失败后,会将整个批次的数据记为脏数据(目前单个批次最大是5M,约合一般1万行内,具体行数与单条数据大小有关);

并行装载(load):

  • 并行装载时,也是将单次任务内所有需要的数据进行一次性提交,但是并行装载提供接口可以查询具体错误数据,进而FineDataLink 也可以实现脏数据明细查询。

2)HIVE 的HDFS写入方式:

直接一次性写入到HDFS文件、完全无法获取明细错误数据,不支持脏数据管理;

4. 运行日志编辑

4.1 可视化日志

点击「运行日志」即可看到当前管道任务的历史运行日志记录,展示日志的时间、等级、分类、描述,如下图所示:

同时可进行日志查询和筛选,如下图所示:

注:支持根据任务筛选日志和根据表筛选日志,同时可以查看各种类型的日志;也可根据查看断点时间。

4.2 日志等级

日志分为四级:BASIC INFO、INFO、WARN、ERROR 四级。

其中BASIC INFO 作为基础等级的日志。

也可以点击每一条日志对应的详情查看具体的说明,如下图所示: