历史版本5 :FTP服务器中白名单文件自动传输与解压 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 应用场景

用户的业务数据以压缩包的形式每日进行下发,存储在 FTP 文件服务器对应的日期目录下,已经下发完成的压缩包文件会生成同名的 .ok 文件。

用户需要将部分压缩包文件传输到另外一个位置,并进行解压,希望已经传输成功的文件,下次任务运行时不会再重复传输。

注:若用户希望将已解压的文件(csv、excel、txt)数据,进行读取入库,方案请参见:获取目录下符合条件的文件,按写入方式读取入库并记录状态

1.2 实现思路

1)将需要进行传输的压缩包对应的 .ok 文件名称输出为参数。

2)需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输:

  • 2.1:由于 .ok 文件将迁移到日期命名的文件夹中(格式为yyyymmdd),构建日期序列、源压缩文件存储位置、.ok文件将要迁移到的具体目录,并输出为参数。

  • 2.2:「循环容器」节点遍历 2.1 步骤中生成的参数,使用「Shell脚本」节点在对应的目标服务器上构建和来源 FTP 服务器相同的日期文件夹,将来源 FTP 服务器上的部分文件(使用第 1 步的参数进行过滤)传输到目标服务器上。

  • 2.3:.ok 文件传输完成后,使用「Shell脚本」遍历 .ok 文件所在的目标文件夹目录,输出 csv 文件记录已经下发成功的 .ok 文件名称和路径。

3)读取 2.3 步骤生成的 csv ,获取 .ok 文件名、文件路径、上层日期文件夹名称,输出到 DB 表中,进行文件下载记录。

4)读取 3 步骤 DB 表中的数据,构建源压缩文件路径、压缩文件将要迁移到的位置、日期、需要传输的文件名集合,进行参数输出。

5)最后循环遍历步骤 4 中的参数,在压缩文件将要迁移到的位置中构建日期文件夹,将最终下发完成的压缩文件、.ok 文件全部输出到目标端对应的目录下。传输成功之后,调用「Shell脚本解压压缩文件,在第 3 步的 DB 表中标记传输完成的文件。

2. 示例编辑

2.1 场景模拟

示例文件结构:cs.zip待传输文件名称.zip

/data/demo_FR/cs目录下,存在以日期命名的文件夹,里面包含压缩文件,压缩文件被成功保存后,会生成一个同名的 .ok 文件。如下图所示:

1726040875416382.png

表名.csv表格中,记录着将要传输的压缩文件生成的 .ok 文件的名称。如下图所示:

1726041156532624.png

现在需要将表名.csv表格中,对应的压缩文件传输到另外一个路径下(/data/demo_files/file2下的日期文件夹中),并进行解压,且已经传输成功文件,下次任务运行时不会再重复传输。

2.2 方案说明

具体思路可参考本文 1.2 节内容,任务设计请看下图:

40.png

2.3 准备工作

1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。

2)由于压缩文件存储在 FTP 服务器上,建议准备一个 FTP/SFTP 数据连接。

3)本文方案中需要用到 4 个 .sh 文件,具体代码可下载进行查看:.sh文件.zip

2.4 迁移.ok文件

需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输。

11.png

2.4.1 将待迁移压缩文件对应的.ok文件名称输出为参数

1)新建定时任务,拖入数据转换节点,进入数据转换节点。

2)拖入文件输入算子,读取表名.csv数据,如下图所示:

8.png

点击「数据预览」,如下图所示:

1726042133171366.png

3)拖入参数输出算子,将读取的数据输出为参数 name。如下图所示:

注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

1726042221174475.png

2.4.2 构建参数

.ok文件将迁移到日期命名的文件夹中(格式为yyyymmdd),本节目的,输出参数:

  • date:日期序列,便于后续步骤中构建与压缩文件所在目录相同的日期文件夹。

  • resources:源压缩文件存储位置。

  • target:构建 .ok 文件将要迁移到的具体目录。

1)拖入数据转换节点,进入数据转换节点。

2)拖入Spark SQL算子,输入语句设置日期序列开始时间和结束时间。如下图所示:

输出两个日期:当年当月第一天日期、当年当月当天日期。

SELECT CONCAT(LEFT(current_date,7),"-01") as s_date, LEFT(current_date,10) as e_date

13.png

点击「数据预览」,如下图所示:

1726042802433038.png

3)再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列、源压缩文件存储位置(/data/demo_FR/cs/)、.ok 文件将要迁移到的具体目录(/data/demo_files/date1/),用户根据实际情况修改 SQL 语句中的路径。

注:SQL 语句中的 Spark SQL 为点击生成。

select concat('/data/demo_FR/cs/',date_format(date,'yyyyMMdd')) as date,date_format(date,'yyyyMMdd') as time,concat('/data/demo_files/date1/',date_format(date,'yyyyMMdd')) as target

  from (

SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL

  ) a

15.png

点击数据预览,如下图所示:

16.png

4)拖入参数输出算子,输出参数:resource、date、target。如下图所示:

注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。

1726043312101234.png

2.4.3 循环遍历参数,迁移.ok文件

本节目的:

  • 使用 date 日期参数,在.ok文件将要迁移的位置中,创建日期格式的文件夹。

  • 遍历压缩包所在路径/data/demo_FR/cs下,符合条件的 .ok 文件(用 2.4.1 节生成的参数name过滤文件),传输到/data/demo_files/date1下的日期文件夹中。

1)拖入「循环容器」节点,遍历 2.4.2 节生成的参数。界面设置如下图所示:

实际场景中,压缩包所在文件夹中,有的压缩包下发完成,有的可能为空(本文示例中是:存在压缩包的文件夹有 3 个,但参数值有 11 个),当使用参数传输文件时,如果有一个来源端没有符合条件的文件存在,「循环容器」节点会报错不再执行,所以需要勾选节点报错时,继续执行循环按钮,使所有参数都能执行。

1726043718962461.png

2)「循环容器」节点中拖入Shell脚本节点,Shell脚节点中,使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。如下图所示:

okmkdir.sh 文件代码如下所示:

注:full_path="/data/demo_files/date1/$folder_name"  中,/data/demo_files/date1需要根据实际情况修改,为.ok文件将要迁移的位置。

#!/bin/sh  
  
# 检查是否提供了足够的参数  
if [ "$#" -ne 1 ]; then  
    echo "Usage: $0 <folder_name>"  
    exit 1  
fi  
  
# 读取第一个参数作为文件夹名  
folder_name=$1  
target=$2
# 构造完整的路径  
full_path="/data/demo_files/date1/$folder_name"  
  
# 使用mkdir -p创建文件夹,确保父目录存在  
mkdir -p "$full_path"  
  
# 检查文件夹是否成功创建  
if [ -d "$full_path" ]; then  
    echo "Folder '$full_path' created successfully."  
else  
    echo "Failed to create folder '$full_path'."  
fi

19.png

3)拖入「文件传输」算子,将源压缩文件存储位置中(resources参数值)的.ok文件(使用name参数值进行过滤),传输到目标文件夹中(target参数值)。如下图所示:

20.png

2.5 准备判断压缩文件是否迁移成功的标记表

.ok 文件传输到目标文件夹后:

  • 使用「Shell脚本」遍历各个日期文件夹下已经下发完成的 .ok 文件,并生成 date.csv 记录 .ok 文件的路径、.ok 文件的名称。

  • 读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,将数据写入到 ok_1 表;ok_1 表中存在字段 flag ,若值为空,则代表 .ok 对应的 zip 文件还没有进行下载,需要进行下载,下载完成后标记 flag 字段。

本节主要准备 ok_1 表。

39.png

2.5.1 生成csv表记录.ok文件的路径、名称

1)由于我们要遍历各个日期文件夹下已经下发完成的 .ok 文件,需要指定遍历目录。

在「参数列表」中新建参数 okfile,值为.ok文件目标文件夹的上一层目录,例如:.ok文件存储在/data/demo_files/date1/20240901,参数值为:/data/demo_files/date1,便于后续遍历/data/demo_files/date1下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

21.png

2)拖入Shell脚本节点,引用 okfile 参数,后续遍历 okfile 参数值下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。

需注意:

Shell脚本节点与循环容器节点的连线,判断条件应该是无条件执行。原因:循环容器节点参数取数时,可能取不到值导致循环容器节点失败,为正常现象,但不能影响后续节点的运行。

38.png

3)Shell脚本节点设置界面如下图所示:

okfile.sh 代码如下:

注:代码含义:找到参数值下的 .ok、.sql、.del文件,输出文件路径和名称到 /data/demo_files/test 路径下的 date.csv 文件中;本文示例中,只有 .ok 文件,代码不影响方案实现;date.csv 的路径用户根据实际情况自定义。

#!/bin/bash  
  
# 指定要搜索的目录  
#SEARCH_DIR="/data/demo_files/cs/2024-04-17/"
 SEARCH_DIR=$1 
# 指定输出CSV文件的名称  
OUTPUT_CSV="/data/demo_files/test/date.csv"  
  
# 使用find命令搜索文件,并将结果输出到CSV文件  
find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"  
  
echo "搜索完成,结果已保存到 $OUTPUT_CSV"

22.png

3)由于后续节点需要用到 date.csv 文件数据,所以需要运行任务生成 date.csv 文件,点击「运行」按钮即可。

2.5.2 生成记录表

读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,并将数据写入到 ok_1 表中进行记录。

1)拖入「数据转换」节点,进入数据转换节点。

2)拖入「文件输入」算子,读取 date.csv 文件。如下图所示:

23.png

点击「数据预览」,如下图所示:

24.png

3)拖入「新增计算列」算子,截取 column 列中的日期。如下图所示:

注:公式中的 column、column1 为点击生成。

left(replace(replace(column,'/data/demo_files/date1/',""),column1,""),8)

25.png

点击数据预览」,如下图所示:

resource 字段后续会被Shell 脚本引用,在压缩文件的目标位置创建日期文件夹。

26.png

4)拖入字段设置算子,修改字段名。如下图所示:

27.png

5)拖入DB表输出算子,创建标记表 ok_1。如下图所示:

28.png

6)由于后续节点需要读取 ok_1 表,所以右键点击「数据转换」节点,选择「运行节点」,生成 ok_1 表。如下图所示:

37.png

7)进入 ok_1 表所在数据库中,给 ok_1 表新增字段 flag ,值默认为空,代表 .ok 对应的 zip 文件还没有进行传输。

后续步骤中,会传输并解压 zip 文件,且将 ok_1 表中 flag 字段标记为 1 。

2.6 传输并解压压缩文件

本节目的:

1)ok_1 表中过滤出 flag 为空的数据,代表 .ok 对应的 zip 文件还没有进行传输;处理 ok_1 表中的数据,输出参数:

  • ${date1}:日期,值示例:20240902(对应 ok_1 表中的 resource 字段)。

  • ${resource_1}:源压缩文件路径(由/data/demo_FR/cs/和 ok_1 表中的 resource 字段拼接而成)。

  • ${target_1}:压缩文件将要迁移到的位置(由/data/demo_files/file2/和 ok_1 表中的 resource 字段拼接而成可自定义,修改/data/demo_files/file2/即可。

  • ${name_1}:需要传输的文件名集合。

30.png

2.6.1 参数准备

1)拖入「数据转换」节点,进入「数据转换」节点。

2)拖入DB表输入算子,过滤出 ok_1 表中 flag 字段为 null 的数据(zip 文件未被传输的数据)。如下图所示:

SELECT name,resource FROM `demotest`.`ok_1`
where flag is  null

31.png

点击数据预览,如下图所示:

1726109487495634.png

3)拖入「Spark SQL算子,处理 ok_1 表中的数据。如下图所示:

32.png

注:SQL 语句中的「DB表输入」为点击生成。

select  concat_ws(',',collect_set(name)) AS swcitylist ,resource as date,concat('/data/demo_FR/cs/',resource) as resource,concat('/data/demo_files/file2/',resource) as target from 

  (

select replace(name,"ok","zip") as name,resource from DB表输入

  union all  

  select name,resource from DB表输入

  )

group by resource

数据处理前如下图所示:

1726109672574672.png

数据处理语句介绍如下表所示:

SQL
说明

select replace(name,"ok","zip") as name,resource from DB表输入

  union all  

  select name,resource from DB表输入

目的:

数据未处理前,name 列只有 .ok 文件的名称,由于我们要传输 .ok 文件以及对应的 zip 文件(zip 文件与 .ok 文件同名),所以需要构建 zip 文件名称

步骤:

使用 replace 函数,将 ok 替换为 zip,替换后的 name 字段与替换前的 name 字段,合并(也读取合并了 resource 字段,resource 字段不做处理

select  concat_ws(',',collect_set(name)) AS swcitylist


从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

创建字段 swcitylist ,值为:

根据 resource 字段,先使用 collect_set 函数对 name 字段进行去重,然后使用 concat_ws 函数将去重后的数据按照逗号进行拼接

目的:

构建需要传输的文件名集合

concat('/data/demo_FR/cs/',resource) as resource


从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

创建字段 resource ,值为

将固定路径/data/demo_FR/cs/与待处理 resource 字段拼接

目的:

构建源压缩文件的位置,用户根据实际情况修改/data/demo_FR/cs/

concat('/data/demo_files/file2/',resource) as target


从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作

创建字段 target ,值为

将固定路径/data/demo_files/file2/与待处理 resource 字段拼接

目的:

构建压缩文件目标位置,用户可自定义/data/demo_files/file2/的值

数据处理后,结果如下图所示:

34.png

4)拖入「参数输出」算子,输出参数。如下图所示:

1726111123739451.png

2.6.2 传输解压压缩文件

1)需要拖入「循环容器」节点,循环遍历 2.6.1 节中的参数。如下图所示:

「节点报错时,继续执行循环」按钮,可不勾选。

36.png

2)循环容器节点中拖入Shell脚本节点,Shell脚本节点引用 date1 参数,在压缩文件要传输的目标端创建日期文件夹。如下图所示:

mkdir.sh 代码如下所示:

注:full_path="/data/demo_files/file2/$folder_name" 中,/data/demo_files/file2/根据实际情况修改为压缩包的目标路径。

#!/bin/sh  
  
# 检查是否提供了足够的参数  
if [ "$#" -ne 1 ]; then  
    echo "Usage: $0 <folder_name>"  
    exit 1  
fi  
  
# 读取第一个参数作为文件夹名  
folder_name=$1  
  
# 构造完整的路径  
full_path="/data/demo_files/file2/$folder_name"  
  
# 使用mkdir -p创建文件夹,确保父目录存在  
mkdir -p "$full_path"  
  
# 检查文件夹是否成功创建  
if [ -d "$full_path" ]; then  
    echo "Folder '$full_path' created successfully."  
else  
    echo "Failed to create folder '$full_path'."  
fi

41.png

3)拖入「文件传输」算子,将${resource_1}来源端路径下,${name_1}文件名集合中的文件,传输到${target_1}路径下。如下图所示:

42.png

4)拖入Shell脚本节点,将 ${target_1} 路径下的 zip 文件解压到 ${target_1} 路径下。如下图所示:

zip.sh 代码如下所示:

#!/bin/sh  
  
# 检查是否提供了足够的参数  
if [ "$#" -ne 2 ]; then  
    echo "Usage: $0 <directory_path> <output_directory>"  
    exit 1  
fi  
  
# 读取目录路径和解压目标目录  
directory_path=$1  
output_directory=$2  
  
# 确保输出目录存在  
mkdir -p "$output_directory"  
  
# 遍历目录下的所有ZIP文件并解压到指定目录  
find "$directory_path" -type f -name "*.zip" -exec unzip -o {} -d "$output_directory" \;  
  
# 注意:这里的-o选项可能不起作用,具体取决于unzip的版本。  
# 如果unzip没有-o选项用于覆盖,你可能需要移除它或处理已存在的文件。

43.png

注:图中的第一个参数代表目标端 zip 文件位置,第二个参数代表 zip 文件解压后的位置,用户根据实际情况设置第二个参数,本文示例是将解压后的 zip 文件与解压前放在一起。

5)拖入「SQL脚本」,已经传输成功的 zip 文件,在 ok_1 表中记录 flag 为 1。如下图所示:

update `demotest`.`ok_1`
set flag =1
where resource='${date1}'

44.png

2.7 效果查看

1)运行任务,任务运行成功后,日志如下图所示:

本文示例中,第一个「循环容器」节点内,文件传输时如果有一个参数对应的来源端没有符合条件的文件存在,循环容器节点状态会为失败,由于我们实际的文件为 3 个,但构建的文件夹为 11 个,肯定会存在取不到数据的情况,为正常现象。

45.png

2)本文示例中,将 .ok 文件传输到/data/demo_files/date1下,任务运行后,该目录下内容如下图所示: