1. 概述
1.1 应用场景
用户的业务数据以压缩包的形式每日进行下发,存储在 FTP 文件服务器对应的日期目录下,已经下发完成的压缩包文件会生成同名的 .ok 文件。
用户需要将部分压缩包文件传输到另外一个位置,并进行解压,希望已经传输成功的文件,下次任务运行时不会再重复传输。
注:若用户希望将已解压的文件(csv、excel、txt)数据,进行读取入库,方案请参见:获取目录下符合条件的文件,按写入方式读取入库并记录状态
1.2 实现思路
1)将需要进行传输的压缩包对应的 .ok 文件名称输出为参数。
2)需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输:
2.1:由于 .ok 文件将迁移到日期命名的文件夹中(格式为yyyymmdd),构建日期序列、源压缩文件存储位置、.ok文件将要迁移到的具体目录,并输出为参数。
2.2:「循环容器」节点遍历 2.1 步骤中生成的参数,使用「Shell脚本」节点在对应的目标服务器上构建和来源 FTP 服务器相同的日期文件夹,将来源 FTP 服务器上的部分文件(使用第 1 步的参数进行过滤)传输到目标服务器上。
2.3:.ok 文件传输完成后,使用「Shell脚本」遍历 .ok 文件所在的目标文件夹目录,输出 csv 文件记录已经下发成功的 .ok 文件名称和路径。
3)读取 2.3 步骤生成的 csv ,获取 .ok 文件名、文件路径、上层日期文件夹名称,输出到 DB 表中,进行文件下载记录。
4)读取 3 步骤 DB 表中的数据,构建源压缩文件路径、压缩文件将要迁移到的位置、日期、需要传输的文件名集合,进行参数输出。
5)最后循环遍历步骤 4 中的参数,在压缩文件将要迁移到的位置中构建日期文件夹,将最终下发完成的压缩文件、.ok 文件全部输出到目标端对应的目录下。传输成功之后,调用「Shell脚本」解压压缩文件,在第 3 步的 DB 表中标记传输完成的文件。
2. 示例
2.1 场景模拟
示例文件结构:cs.zip、待传输文件名称.zip
/data/demo_FR/cs目录下,存在以日期命名的文件夹,里面包含压缩文件,压缩文件被成功保存后,会生成一个同名的 .ok 文件。如下图所示:
表名.csv表格中,记录着将要传输的压缩文件生成的 .ok 文件的名称。如下图所示:
现在需要将表名.csv表格中,对应的压缩文件传输到另外一个路径下(/data/demo_files/file2下的日期文件夹中),并进行解压,且已经传输成功的文件,下次任务运行时不会再重复传输。
2.2 方案说明
具体思路可参考本文 1.2 节内容,任务设计请看下图:
2.3 准备工作
1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。
2)由于压缩文件存储在 FTP 服务器上,建议准备一个 FTP/SFTP 数据连接。
3)本文方案中需要用到 4 个 .sh 文件,具体代码可下载进行查看:.sh文件.zip
2.4 迁移.ok文件
需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输。
2.4.1 将待迁移压缩文件对应的.ok文件名称输出为参数
1)新建定时任务,拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「文件输入」算子,读取表名.csv数据,如下图所示:
点击「数据预览」,如下图所示:
3)拖入「参数输出」算子,将读取的数据输出为参数 name。如下图所示:
注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。
2.4.2 构建参数
.ok文件将迁移到日期命名的文件夹中(格式为yyyymmdd),本节目的,输出参数:
date:日期序列,便于后续步骤中构建与压缩文件所在目录相同的日期文件夹。
resources:源压缩文件存储位置。
target:构建 .ok 文件将要迁移到的具体目录。
1)拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「Spark SQL」算子,输入语句设置日期序列开始时间和结束时间。如下图所示:
输出两个日期:当年当月第一天日期、当年当月当天日期。
SELECT CONCAT(LEFT(current_date,7),"-01") as s_date, LEFT(current_date,10) as e_date
点击「数据预览」,如下图所示:
3)再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列、源压缩文件存储位置(/data/demo_FR/cs/)、.ok 文件将要迁移到的具体目录(/data/demo_files/date1/),用户根据实际情况修改 SQL 语句中的路径。
注:SQL 语句中的 Spark SQL 为点击生成。
select concat('/data/demo_FR/cs/',date_format(date,'yyyyMMdd')) as date,date_format(date,'yyyyMMdd') as time,concat('/data/demo_files/date1/',date_format(date,'yyyyMMdd')) as target
from (
SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL
) a
点击「数据预览」,如下图所示:
4)拖入「参数输出」算子,输出参数:resource、date、target。如下图所示:
注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。
2.4.3 循环遍历参数,迁移.ok文件
本节目的:
使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。
遍历压缩包所在路径/data/demo_FR/cs下,符合条件的 .ok 文件(用 2.4.1 节生成的参数name过滤文件),传输到/data/demo_files/date1下的日期文件夹中。
1)拖入「循环容器」节点,遍历 2.4.2 节生成的参数。界面设置如下图所示:
实际场景中,压缩包所在文件夹中,有的压缩包下发完成,有的可能为空(本文示例中是:存在压缩包的文件夹有 3 个,但参数值有 11 个),当使用参数传输文件时,如果有一个来源端没有符合条件的文件存在,「循环容器」节点会报错不再执行,所以需要勾选「节点报错时,继续执行循环」按钮,使所有参数都能执行。
2)「循环容器」节点中拖入「Shell脚本」节点,「Shell脚本」节点中,使用 date 日期参数,在 .ok 文件将要迁移的位置中,创建日期格式的文件夹。如下图所示:
okmkdir.sh 文件代码如下所示:
注:full_path="/data/demo_files/date1/$folder_name" 中,/data/demo_files/date1需要根据实际情况修改,为.ok文件将要迁移的位置。
#!/bin/sh
# 检查是否提供了足够的参数
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <folder_name>"
exit 1
fi
# 读取第一个参数作为文件夹名
folder_name=$1
target=$2
# 构造完整的路径
full_path="/data/demo_files/date1/$folder_name"
# 使用mkdir -p创建文件夹,确保父目录存在
mkdir -p "$full_path"
# 检查文件夹是否成功创建
if [ -d "$full_path" ]; then
echo "Folder '$full_path' created successfully."
else
echo "Failed to create folder '$full_path'."
fi
3)拖入「文件传输」算子,将源压缩文件存储位置中(resources参数值)的.ok文件(使用name参数值进行过滤),传输到目标文件夹中(target参数值)。如下图所示:
2.5 准备判断压缩文件是否迁移成功的标记表
.ok 文件传输到目标文件夹后:
使用「Shell脚本」遍历各个日期文件夹下已经下发完成的 .ok 文件,并生成 date.csv 记录 .ok 文件的路径、.ok 文件的名称。
读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,将数据写入到 ok_1 表;ok_1 表中存在字段 flag ,若值为空,则代表 .ok 对应的 zip 文件还没有进行下载,需要进行下载,下载完成后标记 flag 字段。
本节主要准备 ok_1 表。
2.5.1 生成csv表记录.ok文件的路径、名称
1)由于我们要遍历各个日期文件夹下已经下发完成的 .ok 文件,需要指定遍历目录。
在「参数列表」中新建参数 okfile,值为.ok文件目标文件夹的上一层目录,例如:.ok文件存储在/data/demo_files/date1/20240901,参数值为:/data/demo_files/date1,便于后续遍历/data/demo_files/date1下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。
2)拖入「Shell脚本」节点,引用 okfile 参数,后续遍历 okfile 参数值下所有的文件夹,找到 .ok 文件,并输出 .ok 文件的路径和名称。
需注意:
「Shell脚本」节点与「循环容器」节点的连线,判断条件应该是无条件执行。原因:「循环容器」节点参数取数时,可能取不到值导致「循环容器」节点失败,为正常现象,但不能影响后续节点的运行。
3)「Shell脚本」节点设置界面如下图所示:
okfile.sh 代码如下:
注:代码含义:找到参数值下的 .ok、.sql、.del文件,输出文件路径和名称到 /data/demo_files/test 路径下的 date.csv 文件中;本文示例中,只有 .ok 文件,代码不影响方案实现;date.csv 的路径用户根据实际情况自定义。
#!/bin/bash
# 指定要搜索的目录
#SEARCH_DIR="/data/demo_files/cs/2024-04-17/"
SEARCH_DIR=$1
# 指定输出CSV文件的名称
OUTPUT_CSV="/data/demo_files/test/date.csv"
# 使用find命令搜索文件,并将结果输出到CSV文件
find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"
echo "搜索完成,结果已保存到 $OUTPUT_CSV"
3)由于后续节点需要用到 date.csv 文件数据,所以需要运行任务生成 date.csv 文件,点击「运行」按钮即可。
2.5.2 生成记录表
读取 date.csv 文件,获取目标端 .ok 文件路径、文件名称、日期文件夹名称,并将数据写入到 ok_1 表中进行记录。
1)拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「文件输入」算子,读取 date.csv 文件。如下图所示:
点击「数据预览」,如下图所示:
3)拖入「新增计算列」算子,截取 column 列中的日期。如下图所示:
注:公式中的 column、column1 为点击生成。
left(replace(replace(column,'/data/demo_files/date1/',""),column1,""),8)
点击「数据预览」,如下图所示:
resource 字段后续会被「Shell 脚本」引用,在压缩文件的目标位置创建日期文件夹。
4)拖入「字段设置」算子,修改字段名。如下图所示:
5)拖入「DB表输出」算子,创建标记表 ok_1。如下图所示:
6)由于后续节点需要读取 ok_1 表,所以右键点击「数据转换」节点,选择「运行节点」,生成 ok_1 表。如下图所示:
7)进入 ok_1 表所在数据库中,给 ok_1 表新增字段 flag ,值默认为空,代表 .ok 对应的 zip 文件还没有进行传输。
后续步骤中,会传输并解压 zip 文件,且将 ok_1 表中 flag 字段标记为 1 。
2.6 传输并解压压缩文件
本节目的:
1)ok_1 表中过滤出 flag 为空的数据,代表 .ok 对应的 zip 文件还没有进行传输;处理 ok_1 表中的数据,输出参数:
${date1}:日期,值示例:20240902(对应 ok_1 表中的 resource 字段)。
${resource_1}:源压缩文件路径(由/data/demo_FR/cs/和 ok_1 表中的 resource 字段拼接而成)。
${target_1}:压缩文件将要迁移到的位置(由/data/demo_files/file2/和 ok_1 表中的 resource 字段拼接而成)。可自定义,修改/data/demo_files/file2/即可。
${name_1}:需要传输的文件名集合。
2.6.1 参数准备
1)拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,过滤出 ok_1 表中 flag 字段为 null 的数据(zip 文件未被传输的数据)。如下图所示:
SELECT name,resource FROM `demotest`.`ok_1`
where flag is null
点击「数据预览」,如下图所示:
3)拖入「Spark SQL」算子,处理 ok_1 表中的数据。如下图所示:
注:SQL 语句中的「DB表输入」为点击生成。
select concat_ws(',',collect_set(name)) AS swcitylist ,resource as date,concat('/data/demo_FR/cs/',resource) as resource,concat('/data/demo_files/file2/',resource) as target from
(
select replace(name,"ok","zip") as name,resource from DB表输入
union all
select name,resource from DB表输入
)
group by resource
数据处理前如下图所示:
数据处理语句介绍如下表所示:
SQL | 说明 |
---|---|
select replace(name,"ok","zip") as name,resource from DB表输入 union all select name,resource from DB表输入 | 目的: 数据未处理前,name 列只有 .ok 文件的名称,由于我们要传输 .ok 文件以及对应的 zip 文件(zip 文件与 .ok 文件同名),所以需要构建 zip 文件名称 步骤: 使用 replace 函数,将 ok 替换为 zip,替换后的 name 字段与替换前的 name 字段,合并(也读取合并了 resource 字段,resource 字段不做处理) |
select concat_ws(',',collect_set(name)) AS swcitylist | 从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作 创建字段 swcitylist ,值为: 根据 resource 字段,先使用 collect_set 函数对 name 字段进行去重,然后使用 concat_ws 函数将去重后的数据按照逗号进行拼接 目的: 构建需要传输的文件名集合 |
concat('/data/demo_FR/cs/',resource) as resource | 从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作 创建字段 resource ,值为: 将固定路径/data/demo_FR/cs/与待处理 resource 字段拼接 目的: 构建源压缩文件的位置,用户根据实际情况修改/data/demo_FR/cs/ |
concat('/data/demo_files/file2/',resource) as target | 从表格第一行 SQL 语句构建的结果集中取数做以下数据处理操作 创建字段 target ,值为: 将固定路径/data/demo_files/file2/与待处理 resource 字段拼接 目的: 构建压缩文件目标位置,用户可自定义/data/demo_files/file2/的值 |
数据处理后,结果如下图所示:
4)拖入「参数输出」算子,输出参数。如下图所示:
2.6.2 传输解压压缩文件
1)需要拖入「循环容器」节点,循环遍历 2.6.1 节中的参数。如下图所示:
「节点报错时,继续执行循环」按钮,可不勾选。
2)「循环容器」节点中拖入「Shell脚本」节点,「Shell脚本」节点引用 date1 参数,在压缩文件要传输的目标端创建日期文件夹。如下图所示:
mkdir.sh 代码如下所示:
注:full_path="/data/demo_files/file2/$folder_name" 中,/data/demo_files/file2/根据实际情况修改为压缩包的目标路径。
#!/bin/sh
# 检查是否提供了足够的参数
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <folder_name>"
exit 1
fi
# 读取第一个参数作为文件夹名
folder_name=$1
# 构造完整的路径
full_path="/data/demo_files/file2/$folder_name"
# 使用mkdir -p创建文件夹,确保父目录存在
mkdir -p "$full_path"
# 检查文件夹是否成功创建
if [ -d "$full_path" ]; then
echo "Folder '$full_path' created successfully."
else
echo "Failed to create folder '$full_path'."
fi
3)拖入「文件传输」算子,将${resource_1}来源端路径下,${name_1}文件名集合中的文件,传输到${target_1}路径下。如下图所示:
4)拖入「Shell脚本」节点,将 ${target_1} 路径下的 zip 文件解压到 ${target_1} 路径下。如下图所示:
zip.sh 代码如下所示:
#!/bin/sh
# 检查是否提供了足够的参数
if [ "$#" -ne 2 ]; then
echo "Usage: $0 <directory_path> <output_directory>"
exit 1
fi
# 读取目录路径和解压目标目录
directory_path=$1
output_directory=$2
# 确保输出目录存在
mkdir -p "$output_directory"
# 遍历目录下的所有ZIP文件并解压到指定目录
find "$directory_path" -type f -name "*.zip" -exec unzip -o {} -d "$output_directory" \;
# 注意:这里的-o选项可能不起作用,具体取决于unzip的版本。
# 如果unzip没有-o选项用于覆盖,你可能需要移除它或处理已存在的文件。
注:图中的第一个参数代表目标端 zip 文件位置,第二个参数代表 zip 文件解压后的位置,用户根据实际情况设置第二个参数,本文示例是将解压后的 zip 文件与解压前放在一起。
5)拖入「SQL脚本」,已经传输成功的 zip 文件,在 ok_1 表中记录 flag 为 1。如下图所示:
update `demotest`.`ok_1`
set flag =1
where resource='${date1}'
2.7 效果查看
1)运行任务,任务运行成功后,日志如下图所示:
本文示例中,第一个「循环容器」节点内,文件传输时如果有一个参数对应的来源端没有符合条件的文件存在,「循环容器」节点状态会为失败,由于我们实际的文件为 3 个,但构建的文件夹为 11 个,肯定会存在取不到数据的情况,为正常现象。
2)本文示例中,将 .ok 文件传输到/data/demo_files/date1下,任务运行后,该目录下内容如下图所示:
3)压缩文件传输到/data/demo_files/file2下,并在该目录下解压。如下图所示:
4)标记表 ok_1 如下图所示:
2.8 后续步骤
1)点击「发布」按钮,将任务发布到 生产模式 下。
2)根据实际情况设置执行频率。详情请参见:调度计划概述