反馈已提交

网络繁忙

获取目录下符合条件的文件,按写入方式读取入库并记录状态

  • 文档创建者:Wendy123456
  • 历史版本:7
  • 最近更新:Wendy123456 于 2024-11-04
  • 1. 概述

    1.1 应用场景

    用户业务流程中下发的文件,有特定的文件生成则代表下发完成。比如:文件下发完成后,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。

    现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

    注1:本文方案适用于 Linux 系统。

    注2:本文方案适用于读取 csv、excel、txt 文件,本文示例是读取的 csv 文件,读取其他类型的文件,需要根据实际情况修改方案细节。

    1.2 实现思路

    1)将需要遍历的目录生成参数,「Shell 脚本」节点使用该参数获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,拼接出待读取的 csv 文件路径,将数据保存到到数据库表中。

    2)根据第一步输出的数据库表,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。

    3)遍历第二步的参数,执行以下步骤:

    • 判断文件是否已读取,若未读取,执行下面步骤。

    • 读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。

    • 记录已写入 csv 文件的路径、写入表、写入时间。

    1.3 任务展示

    FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 中的定时任务:「获取目录下符合条件的文件,按写入方式读取入库并记录状态」。

    2. 示例

    2.1 场景模拟

    示例文件结构:csv.zip

    某银行用户,业务流程中下发的文件,若下发完成,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。如下图所示:

    1725245363179869.png

    其中,文件所处的目录结构如下图所示:

    csv 文件夹下的文件夹是以日期命名的(本文示例是以 2024 年 8 月份日期命名)。

    点击展开更多
    4.png

    现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

    希望每天检查一次,是否有新的已下发完成的文件。

    2.2 方案说明

    1)由于 csv 下,文件夹是以日期命名的,构建遍历目录并输出为参数,参数值示例/data/demo_files/csv/2024-08-01/

    2)「Shell 脚本」节点使用该参数遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,新增列拼接出待读取的 csv 文件路径,将数据输出到 path1 表中。

    3)根据 path1 表数据,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。

    4)遍历第三步的参数,执行以下步骤:

    • 根据待读取 csv 文件路径是否在 table_info 表中,判断文件是否已读取,若未读取,执行下面步骤。

    • 读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。

    • table_info 表记录已写入 csv 文件的路径、写入表、写入时间。

    61.png

    2.3 准备工作

    1)由于需要用到Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。

    2)「Shell 脚本」节点运行后,会生成一个 output.csv 文件,由于需要使用文件输入算子读取该文件,建议准备一个 FTP/SFTP 数据连接。

    2.4 构建遍历目录参数

    本文示例中,需要使用「Shell 脚本」节点到每个日期命名的文件夹中找到 .ok、.sql、.del 文件,所以需要先构建构建遍历目录并输出为参数。

    1725248286301164.png

    2.4.1 设置日期序列开始时间、结束时间

    实际场景中,会有每个月所有天的日期文件夹,由于需要遍历所有文件夹找到 .ok、.sql、.del 文件,需要构建日期序列。

    1)新建定时任务,拖入一个数据转换节点,进入数据转换节点。

    2)拖入「Spark SQL」算子,输入语句设置日期序列开始时间和结束时间。如下图所示:

    本文示例(构建的是 2024 年 8 月份的日期序列):

    SELECT DATE_FORMAT(CONCAT(YEAR(CURRENT_DATE()), '-08-01'), 'yyyy-MM-dd') AS s_date,

    LAST_DAY(CONCAT(YEAR(CURRENT_DATE()), '-08-01')) AS e_date

    其他方案(可构建当年当月第一天到当天的日期序列,用户根据实际情况选择):

    SELECT CONCAT(LEFT(current_date,7),"-01") as s_date,LEFT(current_date,10) as e_date

    8.png

    点击「数据预览」,如下图所示:

    1725259622140907.png

    2.4.2 构建日期序列

    再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列。如下图所示:

    SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL

    注:SQL 语句中的 Spark SQL 为点击生成。

    10.png

    点击数据预览,如下图所示:

    1725259837612850.png

    2.4.3 构建遍历目录并生成参数

    本文示例中,目录结构如下图所示:

    29.png

    后续步骤中,「Shell 脚本」节点需要到/data/demo_files/csv/2024-08-XX/下,找到符合条件的文件,本文需要构建遍历目录。

    1)再拖入一个「Spark SQL」算子,构建遍历目录。如下图所示:

    select concat('\/data\/demo_files\/csv\/',date,"\/") as path  from Spark SQL1

    注:Spark SQL1 为点击生成;用户根据实际路径修改 SQL 语句。

    13.png

    点击数据预览,如下图所示:

    1725260186227943.png

    2)拖入「参数输出」算子,将生成的路径输出为参数。如下图所示:

    注:建议设置调试值,便于后续节点引用该参数时查看效果;调试值不影响实际运行结果。

    15.png

    2.5 获取指定目录下符合条件的文件,并拼接下发完成的csv文件路径

    1725260644115264.png

    2.5.1 获取指定目录下符合条件的文件

    1)由于 2.4 节生成的参数为多个,「Shell 脚本」节点每次只能使用一个参数获取符合条件的文件信息,所以需要用到循环容器节点。

    拖入循环容器节点,遍历目录参数。如下图所示:

    17.png

    2)循环容器节点内拖入「Shell 脚本」节点,遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中。如下图所示:

    .sh文件示例:.sh文件.zip

    .sh文件代码如下所示:

    注:用户根据实际情况修改代码;代码中的 $1 为 2.4.3 节输出的参数名称。

    #!/bin/bash  
      
    # 指定要搜索的目录  
    #SEARCH_DIR="/data/demo_files/csv/2024-04-17/"
     SEARCH_DIR=$1
    # 指定输出CSV文件的名称  
    OUTPUT_CSV="/data/demo_files/csv/output.csv"  
      
    # 使用find命令搜索文件,并将结果输出到CSV文件  
    find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"  
      
    echo "搜索完成,结果已保存到 $OUTPUT_CSV"

    18.png

    output.csv 文件说明:

    「Shell 脚本」节点执行后,会生成一个 output.csv 文件,本文示例中,为避免 output.csv 文件后期过大,.sh 文件每次执行都是清空 output.csv 文件数据再写入,而非追加写入。output.csv 文件数据如下图所示:

    1725262112795196.png

    由于后续步骤中要读取 output.csv 文件,但此时该文件并未生成,建议在 output.csv 文件路径下上传一个示例文件(由于每次都是清空目标表再写入,示例数据不会影响实际任务运行结果)。如下图所示:

    示例文件:output文件.zip

    29.png

    2.5.2 拼接下发完成的csv文件路径

    下发完成的 csv 文件,保存在 file 文件夹中。如下图所示:

    1725263423649442.png

    本节需要根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径,便于后续读取 csv 文件数据。

    1)「Shell 脚本」节点后拖入一个数据转换节点,进入数据转换节点。拖入文件输入算子,读取 output.csv 文件数据。如下图所示:

    22.png

    点击数据预览,如下图所示:

    25.png

    2)拖入一个新增计算列算子,根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径。

    新增 name 列,值为:CONCATENATE(left(#{column},find("/_thread",#{column})),"file/",left(#{column1},find(".",#{column1})),"csv")

    新增 date 列,值为:FORMAT(NOW(),"yyyy-MM-dd hh:mm:ss")

    如下图所示:

    23.png

    点击「数据预览」,如下图所示:

    24.png

    3)拖入字段设置算子,修改字段名。如下图所示:

    27.png

    点击数据预览,如下图所示:

    path 字段值为 .ok、.sql、.del 文件路径,file 字段值为已经下发完成的 csv 文件路径。

    28.png

    4)拖入DB表输出算子,将数据输出到 path1 表中。如下图所示:

    30.png

    写入方式选择追加写入数据,逻辑主键设置为path字段,主键冲突策略选择主键冲突,覆盖目标表的数据。如下图所示:

    1725264334676565.png

    5)由于 path1 表后续会用到,点击运行按钮,运行任务生成 path1 表。如下图所示:

    32.png

    任务运行后,path1 表数据如下图所示:

    33.png

    2.6 将.del文件路径、待读取csv文件路径输出为参数

    1)「循环容器」节点外接一个「数据转换」节点,进入「数据转换」节点。

    2)拖入DB表输入算子,读取 path1 表数据。如下图所示:

    34.png

    3)拖入「Spark SQL」算子,过滤出 .del 文件的数据。如下图所示:

    .del 文件不同值代表不同的写入方式。

    35.png

    点击数据预览,如下图所示:

    36.png

    4)拖入参数输出算子,将 path、file 字段输出为参数。如下图所示:

    • path 字段值:不同值代表不同写入方式;可根据该值判断写入方式。

    • file 字段值:已下发完成的 csv 文件路径;可根据该值读取 csv 文件数据。

    1725265468595681.png

    2.7 根据不同写入方式写入数据

    62.png

    2.7.1 设置循环容器

    由于 2.6 节输出的参数为多个,「文件输入」算子每次读取 csv 文件时OR条件分支节点每次根据参数值进行判断时,只能使用一个参数。所以,需要使用到「循环容器」节点,遍历所有参数。

    拖入「循环容器」节点,遍历对象为 2.6 节输出的两个参数。如下图所示:

    39.png

    2.7.2 判断csv文件是否读取过

    每次循环时,判断传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的记录数是否为 0 ,若为 0 ,则执行后续节点。

    1)设计任务时,需要创建一张 table_info 表:table_info.xlsx

    表字段如下图所示:

    1725268863564333.png

    该表记录已写入 csv 文件的路径、.del 文件值(代表不同写入方式)、csv 文件写入表名称、写入时间。

    2)「循环容器」节点内拖入数据转换节点,进入数据转换节点。

    3)拖入DB表输入算子,统计传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的数量如下图所示:

    SELECT count(*) as t FROM `demotest`.`table_info`
    where file='${file}'

    41.png

    4)将 SQL 语句的值输出为参数。如下图所示:

    42.png

    5)拖入「条件分支」节点、数据转换节点,条件分支节点中设置若参数 t 为 0(该 csv 文件未被读取过),则执行后续节点。如下图所示:

    43.png

    2.7.3 获取写入方式并写入数据

    62.png

    写入方式记录在 .del 文件中,当值为 0 时,清空目标表再写入数据;当值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。

    使用 table_info 表记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

    1)进入「数据转换4」节点,拖入文件输入算子,读取 .del 文件。如下图所示:

    44.png

    2)拖入「参数输出」算子,将 .del 文件的值输出,便于后续判断写入方式。如下图所示:

    45.png

    3)拖入「条件分支」节点、三个数据转换节点,根据 m 参数值的不同,执行不同下游分支节点。

    当 m 值为 0 时,清空目标表再写入数据;值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。

    63.png

    数据转换5节点:全量更新数据

    拖入「文件输入」算子,读取 csv 文件。如下图所示:

    47.png

    点击「数据预览」,如下图所示:

    1725274893436443.png

    拖入DB表输出算子,将数据输出到数据库表中。如下图所示:

    49.png

    数据转换6节点:增量更新数据

    1725275182845703.png

    增量更新数据会写入到数据库表 b 中,设计任务时,需要在数据库中创建一张表 b ,便于与 csv 文件数据对比。

    表 b 数据示例:b.xlsx

    表 b 数据如下图所示:

    1725275327957360.png

    拖入文件输入算子,读取 csv 文件。如下图所示:

    52.png

    拖入「DB表输入」算子,读取表 b 数据。如下图所示:

    53.png

    拖入「数据比对」算子,对比数据。如下图所示:

    54.png

    拖入「DB表输出」算子,执行新增操作。如下图所示:

    55.png

    数据转换7节点:进行比对后,新增和修改数据

    1725276083552045.png

    新增、修改数据会写入到数据库表 c 中,设计任务时,需要在数据库中创建一张表 c ,便于与 csv 文件数据对比。

    表 c 数据示例:c.xlsx

    表 c 数据如下图所示:

    1725276189707427.png

    数据转换6节点不同的是,「DB表输出」算子设置不同:

    56.png

    2.7.4 记录已经读取完成的文件

    58.png

    table_info 表记录已写入 csv 文件的路径、写入表、写入时间。

    1)拖入三个「数据转换」节点,分别与前面的数据转换节点相连。

    2)进入数据转换8节点,拖入Spark SQL算子,SQL 语句如下:

    select '${file}' as file,'${m}' as m ,'a' as table,current_timestamp as time

    获取已写入 csv 文件的路径、.del文件值(不同值代表不同写入方式)、最终数据写入表名、当前时间,如下图所示:

    59.png

    3)拖入「DB表输出」算子,将数据写入到 table_info 表中。如下图所示:

    60.png

    数据转换9节点数据转换10节点数据转换8节点不同的是,Spark SQL算子的 SQL 语句:

    数据转换9节点:select '${file}' as file,'${m}' as m ,'b' as table,current_timestamp as time

    数据转换10节点:select '${file}' as file,'${m}' as m ,'c' as table,current_timestamp as time

    3. 效果查看

    1)点击运行按钮,任务执行成功后,如下图所示:

    65.png

    2)path1 表数据如下图所示:

    64.png

    table_info 表数据如下图所示:

    1725280817712230.png

    a 表数据如下图所示:

    1725280911770101.png

    b 表数据如下图所示:

    1725280951992517.png

    c 表数据如下图所示:

    1725280987532082.png

    3)点击「发布」按钮,将任务发布到生产模式。如下图所示:

    70.png

    可设置执行频率。详情请参见:定时调度




    附件列表


    主题: 数据开发-定时任务
    • 有帮助
    • 没帮助
    • 只是浏览
    • 评价文档,奖励 1 ~ 100 随机 F 豆!

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭



    AI

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持