1. 概述编辑
1.1 应用场景
用户的业务数据以压缩包的形式每日进行下发,存储在 FTP 文件服务器对应的日期目录下,已经下发完成的压缩包文件会生成同名的 .ok 文件。
用户需要将部分压缩包文件传输到另外一个位置,并进行解压,希望已经传输成功的文件,下次任务运行时不会再重复传输。
注:若用户希望将已解压的文件(csv、excel、txt)数据,进行读取入库,方案请参见:获取目录下符合条件的文件,按写入方式读取入库并记录状态
1.2 实现思路
1)将需要进行传输的压缩包对应的 .ok 文件名称输出为参数。
2)需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输:
2.1:由于 .ok 文件将迁移到日期命名的文件夹中(格式为yyyymmdd),构建日期序列、源压缩文件存储位置、.ok文件将要迁移到的具体目录,并输出为参数。
2.2:「循环容器」节点遍历 2.1 步骤中生成的参数,使用「Shell脚本」节点在对应的目标服务器上构建和来源 FTP 服务器相同的日期文件夹,将来源 FTP 服务器上的部分文件(使用第 1 步的参数进行过滤)传输到目标服务器上。
2.3:.ok 文件传输完成后,使用「Shell脚本」遍历 .ok 文件所在的目标文件夹目录,输出 csv 文件记录已经下发成功的 .ok 文件名称和路径。
3)读取 2.3 步骤生成的 csv ,获取 .ok 文件名、文件路径、上层日期文件夹名称,输出到 DB 表中,进行文件下载记录。
4)读取 3 步骤 DB 表中的数据,构建源压缩文件路径、压缩文件将要迁移到的位置、日期、需要传输的文件名集合,进行参数输出。
5)最后循环遍历步骤 4 中的参数,在压缩文件将要迁移到的位置中构建日期文件夹,将最终下发完成的压缩文件、.ok 文件全部输出到目标端对应的目录下。传输成功之后,调用「Shell脚本」解压压缩文件,在第 3 步的 DB 表中标记传输完成的文件。
2. 示例编辑
2.1 场景模拟
示例文件结构:cs.zip、待传输文件名称.zip
/data/demo_FR/cs目录下,存在以日期命名的文件夹,里面包含压缩文件,压缩文件被成功保存后,会生成一个同名的 .ok 文件。如下图所示:
表名.csv表格中,记录着将要传输的压缩文件生成的 .ok 文件的名称。如下图所示:
现在需要将表名.csv表格中,对应的压缩文件传输到另外一个路径下(/data/demo_files/file2下的日期文件夹中),并进行解压,且已经传输成功的文件,下次任务运行时不会再重复传输。
2.2 方案说明
具体思路可参考本文 1.2 节内容,任务设计请看下图:
2.3 准备工作
1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。
2)由于压缩文件存储在 FTP 服务器上,建议准备一个 FTP/SFTP 数据连接。
2.4 迁移.ok文件
需要先迁移 .ok 文件,便于后续步骤中根据 .ok 文件路径,判断压缩文件是否已成功传输。
2.4.1 将待迁移压缩文件对应的.ok文件名称输出为参数
1)新建定时任务,拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「文件输入」算子,读取表名.csv数据,如下图所示:
点击「数据预览」,如下图所示:
3)拖入「参数输出」算子,将读取的数据输出为参数 name。如下图所示:
注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。
2.4.2 构建参数
.ok文件将迁移到日期命名的文件夹中(格式为yyyymmdd),本节目的,输出参数:
date:日期序列,便于后续步骤中构建与压缩文件所在目录相同的日期文件夹。
resources:源压缩文件存储位置。
target:构建 .ok 文件将要迁移到的具体目录。
1)拖入「数据转换」节点,进入「数据转换」节点。
2)拖入「Spark SQL」算子,输入语句设置日期序列开始时间和结束时间。如下图所示:
输出两个日期:当年当月第一天日期、当年当月当天日期。
SELECT CONCAT(LEFT(current_date,7),"-01") as s_date, LEFT(current_date,10) as e_date
点击「数据预览」,如下图所示:
3)再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列、源压缩文件存储位置(/data/demo_FR/cs/)、.ok 文件将要迁移到的具体目录(/data/demo_files/date1/),用户根据实际情况修改 SQL 语句中的路径。
注:SQL 语句中的 Spark SQL 为点击生成。
select concat('/data/demo_FR/cs/',date_format(date,'yyyyMMdd')) as date,date_format(date,'yyyyMMdd') as time,concat('/data/demo_files/date1/',date_format(date,'yyyyMMdd')) as target
from (
SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL
) a
点击「数据预览」,如下图所示:
4)拖入「参数输出」算子,输出参数:resource、date、target。如下图所示:
注:建议设置参数调试值,便于后续节点引用参数时查看效果;该调试值并不影响实际运行结果。
2.4.3 循环遍历参数,迁移.ok文件
本节目的:
使用 date 日期参数,在.ok文件将要迁移的位置中,创建日期格式的文件夹。
遍历压缩包所在路径/data/demo_FR/cs下,符合条件的 .ok 文件(用 2.4.1 节生成的参数name过滤文件),传输到/data/demo_files/date1下的日期文件夹中。
1)拖入「循环容器」节点,遍历 2.4.2 节生成的参数。界面设置如下图所示:
由于压缩包所在文件夹中,