历史版本4 :FTP服务器中白名单文件自动传输与解压 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 应用场景

用户的业务数据以压缩包的形式每日进行下发,存储在 FTP 文件服务器对应的日期目录下,已经下发完成的压缩包文件会生成同名的 .ok 文件。

用户需要将部分压缩包文件传输到另外一个位置,并进行解压,希望已经传输成功的文件,下次任务运行时不会再重复传输。

注:若用户希望将已解压的文件(csv、excel、txt)数据,进行读取入库,方案请参见:获取目录下符合条件的文件,按写入方式读取入库并记录状态

1.2 实现思路

1)将需要进行传输的压缩包对应的 .ok 文件名称输出为参数。

2)需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输:

  • 2.1:由于 .ok 文件将迁移到日期命名的文件夹中(格式为yyyymmdd),构建日期序列、源压缩文件存储位置、.ok文件将要迁移到的具体目录,并输出为参数。

  • 2.2:「循环容器」节点遍历 2.1 步骤中生成的参数,使用「Shell脚本」节点在对应的目标服务器上构建和来源 FTP 服务器相同的日期文件夹,将来源 FTP 服务器上的部分文件(使用第 1 步的参数进行过滤)传输到目标服务器上。

  • 2.3:.ok 文件传输完成后,使用「Shell脚本」遍历 .ok 文件所在的目标文件夹目录,输出 csv 文件记录已经下发成功的 .ok 文件名称和路径。

3)读取 2.3 步骤生成的 csv ,获取 .ok 文件名、文件路径、上层日期文件夹名称,输出到 DB 表中,进行文件下载记录。

4)读取 3 步骤 DB 表中的数据,构建源压缩文件路径、压缩文件将要迁移到的位置、日期、需要传输的文件名集合,进行参数输出。

5)最后循环遍历步骤 4 中的参数,在压缩文件将要迁移到的位置中构建日期文件夹,将最终下发完成的压缩文件、.ok 文件全部输出到目标端对应的目录下。传输成功之后,调用「Shell脚本解压压缩文件,在第 3 步的 DB 表中标记传输完成的文件。

2. 示例编辑

2.1 场景模拟

示例文件结构:cs.zip待传输文件名称.zip

/data/demo_FR/cs目录下,存在以日期命名的文件夹,里面包含压缩文件,压缩文件被成功保存后,会生成一个同名的 .ok 文件。如下图所示:

1726040875416382.png

表名.csv表格中,记录着将要传输的压缩文件生成的 .ok 文件的名称。如下图所示:

1726041156532624.png

现在需要将表名.csv表格中,对应的压缩文件传输到另外一个路径下(/data/demo_files/file2下的日期文件夹中),并进行解压,且已经传输成功文件,下次任务运行时不会再重复传输。

2.2 方案说明

具体思路可参考本文 1.2 节内容,任务设计请看下图:

12.png

2.3 准备工作

1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。

2)由于压缩文件存储在 FTP 服务器上,建议准备一个 FTP/SFTP 数据连接。

3)本文方案中需要用到 4 个 .sh 文件,具体代码可下载进行查看:.sh文件.zip

2.4 迁移.ok文件

需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输。

11.png

2.4.1 将待迁移压缩文件对应的.ok文件名称输出为参数

1)新建定时任务,拖入数据转换节点,进入数据转换节点。

2)拖入文件输入算子,读取表名.csv数据,如下图所示:

8.png

点击「数据预览」,如下图所示:

1726042133171366.png

3)拖入参数输出算子,将读取的数据输出为参数 name。如下图所示:

注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

1726042221174475.png

2.4.2 构建参数

.ok文件将迁移到日期命名的文件夹中(格式为yyyymmdd),本节目的,输出参数:

  • date:日期序列,便于后续步骤中构建与压缩文件所在目录相同的日期文件夹。

  • resources:源压缩文件存储位置。

  • target:构建 .ok 文件将要迁移到的具体目录。

1)拖入数据转换节点,进入数据转换节点。

2)拖入Spark SQL算子,输入语句设置日期序列开始时间和结束时间。如下图所示:

输出两个日期:当年当月第一天日期、当年当月当天日期。

SELECT CONCAT(LEFT(current_date,7),"-01") as s_date, LEFT(current_date,10) as e_date

13.png

点击「数据预览」,如下图所示:

1726042802433038.png

3)再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列、源压缩文件存储位置(/data/demo_FR/cs/)、.ok 文件将要迁移到的具体目录(/data/demo_files/date1/),用户根据实际情况修改 SQL 语句中的路径。

注:SQL 语句中的 Spark SQL 为点击生成。

select concat('/data/demo_FR/cs/',date_format(date,'yyyyMMdd')) as date,date_format(date,'yyyyMMdd') as time,concat('/data/demo_files/date1/',date_format(date,'yyyyMMdd')) as target

  from (

SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL

  ) a

15.png

点击数据预览,如下图所示:

16.png

4)拖入参数输出算子,输出参数:resource、date、target。如下图所示:

注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

1726043312101234.png

2.4.3 循环遍历参数,迁移.ok文件

本节目的:

  • 使用 date 日期参数,在.ok文件将要迁移的位置中,创建日期格式的文件夹。

  • 遍历压缩包所在路径/data/demo_FR/cs下,符合条件的 .ok 文件(用 2.4.1 节生成的参数name过滤文件),传输到/data/demo_files/date1下的日期文件夹中。

1)拖入「循环容器」节点,遍历 2.4.2 节生成的参数。界面设置如下图所示:

实际场景中,压缩包所在文件夹中,有的压缩包下发完成,有的可能为空(本文示例中是:存在压缩包的文件夹有 3 个,但参数值有 11 个),当使用参数传输文件时,如果有一个来源端没有符合条件的文件存在,「循环容器」节点会报错不再执行,所以需要勾选节点报错时,继续执行循环按钮,使所有参数都能执行。

1726043718962461.png

2)「循环容器」节点中拖入Shell脚本节点,Shell脚节点中,使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。如下图所示:

okmkdir.sh 文件代码如下所示:

注:full_path="/data/demo_files/date1/$folder_name"  中,/data/demo_files/date1需要根据实际情况修改,为.ok文件将要迁移的位置。

#!/bin/sh  
  
# 检查是否提供了足够的参数  
if [ "$#" -ne 1 ]; then  
    echo "Usage: $0 <folder_name>"  
    exit 1  
fi  
  
# 读取第一个参数作为文件夹名  
folder_name=$1  
target=$2
# 构造完整的路径  
full_path="/data/demo_files/date1/$folder_name"  
  
# 使用mkdir -p创建文件夹,确保父目录存在  
mkdir -p "$full_path"  
  
# 检查文件夹是否成功创建  
if [ -d "$full_path" ]; then  
    echo "Folder '$full_path' created successfully."  
else  
    echo "Failed to create folder '$full_path'."  
fi

19.png

3)拖入「文件传输」算子,将源压缩文件存储位置中(resources参数值)的.ok文件(使用name参数值进行过滤),传输到目标文件夹中(target参数值)。如下图所示:

20.png

2.5 准备判断压缩文件是否迁移成功的标记表

.ok 文件传输到目标文件夹后:

  • 使用「Shell脚本」遍历各个日期文件夹下已经下发完成的 .ok 文件,并生成 date.csv 记录 .ok 文件的路径、.ok 文件的名称。

  • 读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,将数据写入到 ok_1 表;ok_1 表中存在字段 flag ,若值为空,则代表 .ok 对应的 zip 文件还没有进行下载,需要进行下载,下载完成后标记 flag 字段。

本节主要准备 ok_1 表。

12.png

2.5.1 生成csv表记录.ok文件的路径、名称

1)由于我们要遍历各个日期文件夹下已经下发完成的 .ok 文件,需要指定遍历目录。

在「参数列表」中新建参数 okfile,值为.ok文件目标文件夹的上一层目录,例如:.ok文件存储在/data/demo_files/date1/20240901,参数值为:/data/demo_files/date1,便于后续遍历/data/demo_files/date1下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

21.png

2)拖入Shell脚本节点,引用 okfile 参数,后续遍历 okfile 参数值下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

需注意:

Shell脚本节点与循环容器节点的连线,判断条件应该是无条件执行。原因:循环容器节点参数取数时,可能取不到值导致循环容器节点失败,为正常现象,但不能影响后续节点的运行。

12.png

3)Shell脚本节点设置界面如下图所示:

okfile.sh 代码如下:

注:代码含义:找到参数值下的 .ok、.sql、.del文件,输出文件路径和名称到 /data/demo_files/test 路径下的 date.csv 文件中;本文示例中,只有 .ok 文件,代码不影响方案实现;date.csv 的路径用户根据实际情况自定义。

#!/bin/bash  
  
# 指定要搜索的目录  
#SEARCH_DIR="/data/demo_files/cs/2024-04-17/"
 SEARCH_DIR=$1 
# 指定输出CSV文件的名称  
OUTPUT_CSV="/data/demo_files/test/date.csv"  
  
# 使用find命令搜索文件,并将结果输出到CSV文件  
find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"  
  
echo "搜索完成,结果已保存到 $OUTPUT_CSV"

22.png

3)由于后续节点需要用到 date.csv 文件数据,所以需要运行任务生成 date.csv 文件,点击「运行」按钮即可。

2.5.2 生成记录表

读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,并将数据写入到 ok_1 表中进行记录。

1)拖入「数据转换」节点,进入数据转换节点。

2)拖入「文件输入」算子,读取 date.csv 文件。如下图所示:

23.png

点击「数据预览」,如下图所示:

24.png

3)拖入「新增计算列」算子,截取 column 列中的日期。如下图所示:

left(replace(replace(column,'/data/demo_files/date1/',""),column1,""),8)

25.png

点击数据预览」,如下图所示: