反馈已提交

网络繁忙

FTP服务器中白名单文件自动传输与解压

  • 文档创建者:Wendy123456
  • 历史版本:5
  • 最近更新:Wendy123456 于 2024-09-12
  • 1. 概述

    1.1 应用场景

    用户的业务数据以压缩包的形式每日进行下发,存储在 FTP 文件服务器对应的日期目录下,已经下发完成的压缩包文件会生成同名的 .ok 文件。

    用户需要将部分压缩包文件传输到另外一个位置,并进行解压,希望已经传输成功的文件,下次任务运行时不会再重复传输。

    注:若用户希望将已解压的文件(csv、excel、txt)数据,进行读取入库,方案请参见:获取目录下符合条件的文件,按写入方式读取入库并记录状态

    1.2 实现思路

    1)将需要进行传输的压缩包对应的 .ok 文件名称输出为参数。

    2)需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输:

    • 2.1:由于 .ok 文件将迁移到日期命名的文件夹中(格式为yyyymmdd),构建日期序列、源压缩文件存储位置、.ok文件将要迁移到的具体目录,并输出为参数。

    • 2.2:「循环容器」节点遍历 2.1 步骤中生成的参数,使用「Shell脚本」节点在对应的目标服务器上构建和来源 FTP 服务器相同的日期文件夹,将来源 FTP 服务器上的部分文件(使用第 1 步的参数进行过滤)传输到目标服务器上。

    • 2.3:.ok 文件传输完成后,使用「Shell脚本」遍历 .ok 文件所在的目标文件夹目录,输出 csv 文件记录已经下发成功的 .ok 文件名称和路径。

    3)读取 2.3 步骤生成的 csv ,获取 .ok 文件名、文件路径、上层日期文件夹名称,输出到 DB 表中,进行文件下载记录。

    4)读取 3 步骤 DB 表中的数据,构建源压缩文件路径、压缩文件将要迁移到的位置、日期、需要传输的文件名集合,进行参数输出。

    5)最后循环遍历步骤 4 中的参数,在压缩文件将要迁移到的位置中构建日期文件夹,将最终下发完成的压缩文件、.ok 文件全部输出到目标端对应的目录下。传输成功之后,调用「Shell脚本解压压缩文件,在第 3 步的 DB 表中标记传输完成的文件。

    2. 示例

    2.1 场景模拟

    示例文件结构:cs.zip待传输文件名称.zip

    /data/demo_FR/cs目录下,存在以日期命名的文件夹,里面包含压缩文件,压缩文件被成功保存后,会生成一个同名的 .ok 文件。如下图所示:

    1726040875416382.png

    表名.csv表格中,记录着将要传输的压缩文件生成的 .ok 文件的名称。如下图所示:

    1726041156532624.png

    现在需要将表名.csv表格中,对应的压缩文件传输到另外一个路径下(/data/demo_files/file2下的日期文件夹中),并进行解压,且已经传输成功文件,下次任务运行时不会再重复传输。

    2.2 方案说明

    具体思路可参考本文 1.2 节内容,任务设计请看下图:

    40.png

    2.3 准备工作

    1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。

    2)由于压缩文件存储在 FTP 服务器上,建议准备一个 FTP/SFTP 数据连接。

    3)本文方案中需要用到 4 个 .sh 文件,具体代码可下载进行查看:.sh文件.zip

    2.4 迁移.ok文件

    需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输。

    11.png

    2.4.1 将待迁移压缩文件对应的.ok文件名称输出为参数

    1)新建定时任务,拖入数据转换节点,进入数据转换节点。

    2)拖入文件输入算子,读取表名.csv数据,如下图所示:

    8.png

    点击「数据预览」,如下图所示:

    1726042133171366.png

    3)拖入参数输出算子,将读取的数据输出为参数 name。如下图所示:

    注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

    1726042221174475.png

    2.4.2 构建参数

    .ok文件将迁移到日期命名的文件夹中(格式为yyyymmdd),本节目的,输出参数:

    • date:日期序列,便于后续步骤中构建与压缩文件所在目录相同的日期文件夹。

    • resources:源压缩文件存储位置。

    • target:构建 .ok 文件将要迁移到的具体目录。

    1)拖入数据转换节点,进入数据转换节点。

    2)拖入Spark SQL算子,输入语句设置日期序列开始时间和结束时间。如下图所示:

    输出两个日期:当年当月第一天日期、当年当月当天日期。

    SELECT CONCAT(LEFT(current_date,7),"-01") as s_date, LEFT(current_date,10) as e_date

    13.png

    点击「数据预览」,如下图所示:

    1726042802433038.png

    3)再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列、源压缩文件存储位置(/data/demo_FR/cs/)、.ok 文件将要迁移到的具体目录(/data/demo_files/date1/),用户根据实际情况修改 SQL 语句中的路径。

    注:SQL 语句中的 Spark SQL 为点击生成。

    select concat('/data/demo_FR/cs/',date_format(date,'yyyyMMdd')) as date,date_format(date,'yyyyMMdd') as time,concat('/data/demo_files/date1/',date_format(date,'yyyyMMdd')) as target

      from (

    SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL

      ) a

    15.png

    点击数据预览,如下图所示:

    16.png

    4)拖入参数输出算子,输出参数:resource、date、target。如下图所示:

    注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

    1726043312101234.png

    2.4.3 循环遍历参数,迁移.ok文件

    本节目的:

    • 使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。

    • 遍历压缩包所在路径/data/demo_FR/cs下,符合条件的 .ok 文件(用 2.4.1 节生成的参数name过滤文件),传输到/data/demo_files/date1下的日期文件夹中。

    1)拖入「循环容器」节点,遍历 2.4.2 节生成的参数。界面设置如下图所示:

    实际场景中,压缩包所在文件夹中,有的压缩包下发完成,有的可能为空(本文示例中是:存在压缩包的文件夹有 3 个,但参数值有 11 个),当使用参数传输文件时,如果有一个来源端没有符合条件的文件存在,「循环容器」节点会报错不再执行,所以需要勾选节点报错时,继续执行循环按钮,使所有参数都能执行。

    1726043718962461.png

    2)「循环容器」节点中拖入Shell脚本节点,Shell脚节点中,使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。如下图所示:

    okmkdir.sh 文件代码如下所示:

    注:full_path="/data/demo_files/date1/$folder_name"  中,/data/demo_files/date1需要根据实际情况修改,为.ok文件将要迁移的位置。

    #!/bin/sh  
      
    # 检查是否提供了足够的参数  
    if [ "$#" -ne 1 ]; then  
        echo "Usage: $0 <folder_name>"  
        exit 1  
    fi  
      
    # 读取第一个参数作为文件夹名  
    folder_name=$1  
    target=$2
    # 构造完整的路径  
    full_path="/data/demo_files/date1/$folder_name"  
      
    # 使用mkdir -p创建文件夹,确保父目录存在  
    mkdir -p "$full_path"  
      
    # 检查文件夹是否成功创建  
    if [ -d "$full_path" ]; then  
        echo "Folder '$full_path' created successfully."  
    else  
        echo "Failed to create folder '$full_path'."  
    fi

    19.png

    3)拖入「文件传输」算子,将源压缩文件存储位置中(resources参数值)的.ok文件(使用name参数值进行过滤),传输到目标文件夹中(target参数值)。如下图所示:

    20.png

    2.5 准备判断压缩文件是否迁移成功的标记表

    .ok 文件传输到目标文件夹后:

    • 使用「Shell脚本」遍历各个日期文件夹下已经下发完成的 .ok 文件,并生成 date.csv 记录 .ok 文件的路径、.ok 文件的名称。

    • 读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,将数据写入到 ok_1 表;ok_1 表中存在字段 flag ,若值为空,则代表 .ok 对应的 zip 文件还没有进行下载,需要进行下载,下载完成后标记 flag 字段。

    本节主要准备 ok_1 表。

    39.png

    2.5.1 生成csv表记录.ok文件的路径、名称

    1)由于我们要遍历各个日期文件夹下已经下发完成的 .ok 文件,需要指定遍历目录。

    在「参数列表」中新建参数 okfile,值为.ok文件目标文件夹的上一层目录,例如:.ok文件存储在/data/demo_files/date1/20240901,参数值为:/data/demo_files/date1,便于后续遍历/data/demo_files/date1下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

    21.png

    2)拖入Shell脚本节点,引用 okfile 参数,后续遍历 okfile 参数值下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

    需注意:

    Shell脚本节点与循环容器节点的连线,判断条件应该是无条件执行。原因:循环容器节点参数取数时,可能取不到值导致循环容器节点失败,为正常现象,但不能影响后续节点的运行。

    38.png

    3)Shell脚本节点设置界面如下图所示:

    okfile.sh 代码如下:

    注:代码含义:找到参数值下的 .ok、.sql、.del文件,输出文件路径和名称到 /data/demo_files/test 路径下的 date.csv 文件中;本文示例中,只有 .ok 文件,代码不影响方案实现;date.csv 的路径用户根据实际情况自定义。

    #!/bin/bash  
      
    # 指定要搜索的目录  
    #SEARCH_DIR="/data/demo_files/cs/2024-04-17/"
     SEARCH_DIR=$1 
    # 指定输出CSV文件的名称  
    OUTPUT_CSV="/data/demo_files/test/date.csv"  
      
    # 使用find命令搜索文件,并将结果输出到CSV文件  
    find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"  
      
    echo "搜索完成,结果已保存到 $OUTPUT_CSV"

    22.png

    3)由于后续节点需要用到 date.csv 文件数据,所以需要运行任务生成 date.csv 文件,点击「运行」按钮即可。

    2.5.2 生成记录表

    读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,并将数据写入到 ok_1 表中进行记录。

    1)拖入「数据转换」节点,进入数据转换节点。

    2)拖入「文件输入」算子,读取 date.csv 文件。如下图所示:

    23.png

    点击「数据预览」,如下图所示:

    24.png

    3)拖入「新增计算列」算子,截取 column 列中的日期。如下图所示:

    注:公式中的 column、column1 为点击生成。

    left(replace(replace(column,'/data/demo_files/date1/',""),column1,""),8)

    25.png

    点击数据预览」,如下图所示:

    resource 字段后续会被Shell 脚本引用,在压缩文件的目标位置创建日期文件夹。

    26.png

    4)拖入字段设置算子,修改字段名。如下图所示:

    27.png

    5)拖入DB表输出算子,创建标记表 ok_1。如下图所示:

    28.png

    6)由于后续节点需要读取 ok_1 表,所以右键点击「数据转换」节点,选择「运行节点」,生成 ok_1 表。如下图所示:

    37.png

    7)进入 ok_1 表所在数据库中,给 ok_1 表新增字段 flag ,值默认为空,代表 .ok 对应的 zip 文件还没有进行传输。

    后续步骤中,会传输并解压 zip 文件,且将 ok_1 表中 flag 字段标记为 1 。

    2.6 传输并解压压缩文件

    本节目的:

    1)ok_1 表中过滤出 flag 为空的数据,代表 .ok 对应的 zip 文件还没有进行传输;处理 ok_1 表中的数据,输出参数:

    • ${date1}:日期,值示例:20240902(对应 ok_1 表中的 resource 字段)。

    • ${resource_1}:源压缩文件路径(由/data/demo_FR/cs/和 ok_1 表中的 resource 字段拼接而成)。

    • ${target_1}:压缩文件将要迁移到的位置(由/data/demo_files/file2/和 ok_1 表中的 resource 字段拼接而成可自定义,修改/data/demo_files/file2/即可。

    • ${name_1}:需要传输的文件名集合。

    30.png

    2.6.1 参数准备

    1)拖入「数据转换」节点,进入「数据转换」节点。

    2)拖入DB表输入算子,过滤出 ok_1 表中 flag 字段为 null 的数据(zip 文件未被传输的数据)。如下图所示:

    SELECT name,resource FROM `demotest`.`ok_1`
    where flag is  null

    31.png

    点击数据预览,如下图所示:

    1726109487495634.png

    3)拖入「Spark SQL算子,处理 ok_1 表中的数据。如下图所示:

    32.png

    注:SQL 语句中的「DB表输入」为点击生成。

    select  concat_ws(',',collect_set(name)) AS swcitylist ,resource as date,concat('/data/demo_FR/cs/',resource) as resource,concat('/data/demo_files/file2/',resource) as target from 

      (

    select replace(name,"ok","zip") as name,resource from DB表输入

      union all  

      select name,resource from DB表输入

      )

    group by resource

    数据处理前如下图所示:

    1726109672574672.png

    数据处理语句介绍如下表所示:

    SQL
    说明

    select replace(name,"ok","zip") as name,resource from DB表输入

      union all  

      select name,resource from DB表输入

    目的:

    数据未处理前,name 列只有 .ok 文件的名称,由于我们要传输 .ok 文件以及对应的 zip 文件(zip 文件与 .ok 文件同名),所以需要构建 zip 文件名称

    步骤:

    使用 replace 函数,将 ok 替换为 zip,替换后的 name 字段与替换前的 name 字段,合并(也读取合并了 resource 字段,resource 字段不做处理

    select  concat_ws(',',collect_set(name)) AS swcitylist


    从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

    创建字段 swcitylist ,值为:

    根据 resource 字段,先使用 collect_set 函数对 name 字段进行去重,然后使用 concat_ws 函数将去重后的数据按照逗号进行拼接

    目的:

    构建需要传输的文件名集合

    concat('/data/demo_FR/cs/',resource) as resource


    从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

    创建字段 resource ,值为

    将固定路径/data/demo_FR/cs/与待处理 resource 字段拼接

    目的:

    构建源压缩文件的位置,用户根据实际情况修改/data/demo_FR/cs/

    concat('/data/demo_files/file2/',resource) as target


    从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

    创建字段 target ,值为

    将固定路径/data/demo_files/file2/与待处理 resource 字段拼接

    目的:

    构建压缩文件目标位置,用户可自定义/data/demo_files/file2/的值

    数据处理后,结果如下图所示:

    34.png

    4)拖入「参数输出」算子,输出参数。如下图所示:

    1726111123739451.png

    2.6.2 传输解压压缩文件

    1)需要拖入「循环容器」节点,循环遍历 2.6.1 节中的参数。如下图所示:

    「节点报错时,继续执行循环」按钮,可不勾选。

    36.png

    2)循环容器节点中拖入Shell脚本节点,Shell脚本节点引用 date1 参数,在压缩文件要传输的目标端创建日期文件夹。如下图所示:

    mkdir.sh 代码如下所示:

    注:full_path="/data/demo_files/file2/$folder_name" 中,/data/demo_files/file2/根据实际情况修改为压缩包的目标路径。

    #!/bin/sh  
      
    # 检查是否提供了足够的参数  
    if [ "$#" -ne 1 ]; then  
        echo "Usage: $0 <folder_name>"  
        exit 1  
    fi  
      
    # 读取第一个参数作为文件夹名  
    folder_name=$1  
      
    # 构造完整的路径  
    full_path="/data/demo_files/file2/$folder_name"  
      
    # 使用mkdir -p创建文件夹,确保父目录存在  
    mkdir -p "$full_path"  
      
    # 检查文件夹是否成功创建  
    if [ -d "$full_path" ]; then  
        echo "Folder '$full_path' created successfully."  
    else  
        echo "Failed to create folder '$full_path'."  
    fi

    41.png

    3)拖入「文件传输」算子,将${resource_1}来源端路径下,${name_1}文件名集合中的文件,传输到${target_1}路径下。如下图所示:

    42.png

    4)拖入Shell脚本节点,将 ${target_1} 路径下的 zip 文件解压到 ${target_1} 路径下。如下图所示:

    zip.sh 代码如下所示:

    #!/bin/sh  
      
    # 检查是否提供了足够的参数  
    if [ "$#" -ne 2 ]; then  
        echo "Usage: $0 <directory_path> <output_directory>"  
        exit 1  
    fi  
      
    # 读取目录路径和解压目标目录  
    directory_path=$1  
    output_directory=$2  
      
    # 确保输出目录存在  
    mkdir -p "$output_directory"  
      
    # 遍历目录下的所有ZIP文件并解压到指定目录  
    find "$directory_path" -type f -name "*.zip" -exec unzip -o {} -d "$output_directory" \;  
      
    # 注意:这里的-o选项可能不起作用,具体取决于unzip的版本。  
    # 如果unzip没有-o选项用于覆盖,你可能需要移除它或处理已存在的文件。

    43.png

    注:图中的第一个参数代表目标端 zip 文件位置,第二个参数代表 zip 文件解压后的位置,用户根据实际情况设置第二个参数,本文示例是将解压后的 zip 文件与解压前放在一起。

    5)拖入「SQL脚本」,已经传输成功的 zip 文件,在 ok_1 表中记录 flag 为 1。如下图所示:

    update `demotest`.`ok_1`
    set flag =1
    where resource='${date1}'

    44.png

    2.7 效果查看

    1)运行任务,任务运行成功后,日志如下图所示:

    本文示例中,第一个「循环容器」节点内,文件传输时如果有一个参数对应的来源端没有符合条件的文件存在,循环容器节点状态会为失败,由于我们实际的文件为 3 个,但构建的文件夹为 11 个,肯定会存在取不到数据的情况,为正常现象。

    45.png

    2)本文示例中,将 .ok 文件传输到/data/demo_files/date1下,任务运行后,该目录下内容如下图所示:

    46.png

    3)压缩文件传输到/data/demo_files/file2下,并在该目录下解压。如下图所示:

    47.png

    4)标记表 ok_1 如下图所示:

    1726121811825788.png

    2.8 后续步骤

    1)点击「发布」按钮,将任务发布到 生产模式 下。

    2)根据实际情况设置执行频率。详情请参见:调度计划概述




    附件列表


    主题: 数据开发-定时任务
    • 有帮助
    • 没帮助
    • 只是浏览
    • 评价文档,奖励 1 ~ 100 随机 F 豆!

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持