1. 概述
1.1 应用场景
用户业务流程中下发的文件,有特定的文件生成则代表下发完成。比如:文件下发完成后,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。
现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。
注1:本文方案适用于 Linux 系统。
注2:本文方案适用于读取 csv、excel、txt 文件,本文示例是读取的 csv 文件,读取其他类型的文件,需要根据实际情况修改方案细节。
1.2 实现思路
1)将需要遍历的目录生成参数,「Shell 脚本」节点使用该参数获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,拼接出待读取的 csv 文件路径,将数据保存到到数据库表中。
2)根据第一步输出的数据库表,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。
3)遍历第二步的参数,执行以下步骤:
判断文件是否已读取,若未读取,执行下面步骤。
读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。
记录已写入 csv 文件的路径、写入表、写入时间。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 中的定时任务:「获取目录下符合条件的文件,按写入方式读取入库并记录状态」。
2. 示例
2.1 场景模拟
示例文件结构:csv.zip
某银行用户,业务流程中下发的文件,若下发完成,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。如下图所示:
其中,文件所处的目录结构如下图所示:
csv 文件夹下的文件夹是以日期命名的(本文示例是以 2024 年 8 月份日期命名)。
点击展开更多 |
现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。
希望每天检查一次,是否有新的已下发完成的文件。
2.2 方案说明
1)由于 csv 下,文件夹是以日期命名的,构建遍历目录并输出为参数,参数值示例:/data/demo_files/csv/2024-08-01/
2)「Shell 脚本」节点使用该参数遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,新增列拼接出待读取的 csv 文件路径,将数据输出到 path1 表中。
3)根据 path1 表数据,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。
4)遍历第三步的参数,执行以下步骤:
根据待读取 csv 文件路径是否在 table_info 表中,判断文件是否已读取,若未读取,执行下面步骤。
读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。
table_info 表记录已写入 csv 文件的路径、写入表、写入时间。
2.3 准备工作
1)由于需要用到「Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。
2)「Shell 脚本」节点运行后,会生成一个 output.csv 文件,由于需要使用「文件输入」算子读取该文件,建议准备一个 FTP/SFTP 数据连接。
2.4 构建遍历目录参数
本文示例中,需要使用「Shell 脚本」节点到每个日期命名的文件夹中找到 .ok、.sql、.del 文件,所以需要先构建构建遍历目录并输出为参数。
2.4.1 设置日期序列开始时间、结束时间
实际场景中,会有每个月所有天的日期文件夹,由于需要遍历所有文件夹找到 .ok、.sql、.del 文件,需要构建日期序列。
1)新建定时任务,拖入一个「数据转换」节点,进入「数据转换」节点。
2)拖入「Spark SQL」算子,输入语句设置日期序列开始时间和结束时间。如下图所示:
本文示例(构建的是 2024 年 8 月份的日期序列):
SELECT DATE_FORMAT(CONCAT(YEAR(CURRENT_DATE()), '-08-01'), 'yyyy-MM-dd') AS s_date,
LAST_DAY(CONCAT(YEAR(CURRENT_DATE()), '-08-01')) AS e_date
其他方案(可构建当年当月第一天到当天的日期序列,用户根据实际情况选择):
SELECT CONCAT(LEFT(current_date,7),"-01") as s_date,LEFT(current_date,10) as e_date
点击「数据预览」,如下图所示:
2.4.2 构建日期序列
再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列。如下图所示:
SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL
注:SQL 语句中的 Spark SQL 为点击生成。
点击「数据预览」,如下图所示:
2.4.3 构建遍历目录并生成参数
本文示例中,目录结构如下图所示:
后续步骤中,「Shell 脚本」节点需要到/data/demo_files/csv/2024-08-XX/下,找到符合条件的文件,本文需要构建遍历目录。
1)再拖入一个「Spark SQL」算子,构建遍历目录。如下图所示:
select concat('\/data\/demo_files\/csv\/',date,"\/") as path from Spark SQL1
注:Spark SQL1 为点击生成;用户根据实际路径修改 SQL 语句。
点击「数据预览」,如下图所示:
2)拖入「参数输出」算子,将生成的路径输出为参数。如下图所示:
注:建议设置调试值,便于后续节点引用该参数时查看效果;调试值不影响实际运行结果。
2.5 获取指定目录下符合条件的文件,并拼接下发完成的csv文件路径
2.5.1 获取指定目录下符合条件的文件
1)由于 2.4 节生成的参数为多个,「Shell 脚本」节点每次只能使用一个参数获取符合条件的文件信息,所以需要用到「循环容器」节点。
拖入「循环容器」节点,遍历目录参数。如下图所示:
2)「循环容器」节点内拖入「Shell 脚本」节点,遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中。如下图所示:
.sh文件示例:.sh文件.zip
.sh文件代码如下所示:
注:用户根据实际情况修改代码;代码中的 $1 为 2.4.3 节输出的参数名称。
#!/bin/bash
# 指定要搜索的目录
#SEARCH_DIR="/data/demo_files/csv/2024-04-17/"
SEARCH_DIR=$1
# 指定输出CSV文件的名称
OUTPUT_CSV="/data/demo_files/csv/output.csv"
# 使用find命令搜索文件,并将结果输出到CSV文件
find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"
echo "搜索完成,结果已保存到 $OUTPUT_CSV"
output.csv 文件说明:
「Shell 脚本」节点执行后,会生成一个 output.csv 文件,本文示例中,为避免 output.csv 文件后期过大,.sh 文件每次执行都是清空 output.csv 文件数据再写入,而非追加写入。output.csv 文件数据如下图所示:
由于后续步骤中要读取 output.csv 文件,但此时该文件并未生成,建议在 output.csv 文件路径下上传一个示例文件(由于每次都是清空目标表再写入,示例数据不会影响实际任务运行结果)。如下图所示:
示例文件:output文件.zip
2.5.2 拼接下发完成的csv文件路径
下发完成的 csv 文件,保存在 file 文件夹中。如下图所示:
本节需要根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径,便于后续读取 csv 文件数据。
1)「Shell 脚本」节点后拖入一个「数据转换」节点,进入「数据转换」节点。拖入「文件输入」算子,读取 output.csv 文件数据。如下图所示:
点击「数据预览」,如下图所示:
2)拖入一个「新增计算列」算子,根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径。
新增 name 列,值为:CONCATENATE(left(#{column},find("/_thread",#{column})),"file/",left(#{column1},find(".",#{column1})),"csv")
新增 date 列,值为:FORMAT(NOW(),"yyyy-MM-dd hh:mm:ss")
如下图所示:
点击「数据预览」,如下图所示:
3)拖入「字段设置」算子,修改字段名。如下图所示:
点击「数据预览」,如下图所示:
path 字段值为 .ok、.sql、.del 文件路径,file 字段值为已经下发完成的 csv 文件路径。
4)拖入「DB表输出」算子,将数据输出到 path1 表中。如下图所示:
写入方式选择「追加写入数据」,逻辑主键设置为「path」字段,主键冲突策略选择「主键冲突,覆盖目标表的数据」。如下图所示:
5)由于 path1 表后续会用到,点击「运行」按钮,运行任务生成 path1 表。如下图所示:
任务运行后,path1 表数据如下图所示:
2.6 将.del文件路径、待读取csv文件路径输出为参数
1)「循环容器」节点外接一个「数据转换」节点,进入「数据转换」节点。
2)拖入「DB表输入」算子,读取 path1 表数据。如下图所示:
3)拖入「Spark SQL」算子,过滤出 .del 文件的数据。如下图所示:
.del 文件不同值代表不同的写入方式。
点击「数据预览」,如下图所示:
4)拖入「参数输出」算子,将 path、file 字段输出为参数。如下图所示:
path 字段值:不同值代表不同写入方式;可根据该值判断写入方式。
file 字段值:已下发完成的 csv 文件路径;可根据该值读取 csv 文件数据。
2.7 根据不同写入方式写入数据
2.7.1 设置循环容器
由于 2.6 节输出的参数为多个,「文件输入」算子每次读取 csv 文件时OR「条件分支」节点每次根据参数值进行判断时,只能使用一个参数。所以,需要使用到「循环容器」节点,遍历所有参数。
拖入「循环容器」节点,遍历对象为 2.6 节输出的两个参数。如下图所示:
2.7.2 判断csv文件是否读取过
每次循环时,判断传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的记录数是否为 0 ,若为 0 ,则执行后续节点。
1)设计任务时,需要创建一张 table_info 表:table_info.xlsx
表字段如下图所示:
该表记录已写入 csv 文件的路径、.del 文件值(代表不同写入方式)、csv 文件写入表名称、写入时间。
2)「循环容器」节点内拖入「数据转换」节点,进入「数据转换」节点。
3)拖入「DB表输入」算子,统计传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的数量。如下图所示:
SELECT count(*) as t FROM `demotest`.`table_info`
where file='${file}'
4)将 SQL 语句的值输出为参数。如下图所示:
5)拖入「条件分支」节点、「数据转换」节点,「条件分支」节点中设置若参数 t 为 0(该 csv 文件未被读取过),则执行后续节点。如下图所示:
2.7.3 获取写入方式并写入数据
写入方式记录在 .del 文件中,当值为 0 时,清空目标表再写入数据;当值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。
使用 table_info 表记录已经读取完成的文件,以便在任务异常重启时避免重复写入。
1)进入「数据转换4」节点,拖入「文件输入」算子,读取 .del 文件。如下图所示:
2)拖入「参数输出」算子,将 .del 文件的值输出,便于后续判断写入方式。如下图所示:
3)拖入「条件分支」节点、三个「数据转换」节点,根据 m 参数值的不同,执行不同下游分支节点。
当 m 值为 0 时,清空目标表再写入数据;值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。
数据转换5节点:全量更新数据
拖入「文件输入」算子,读取 csv 文件。如下图所示:
点击「数据预览」,如下图所示:
拖入「DB表输出」算子,将数据输出到数据库表中。如下图所示:
数据转换6节点:增量更新数据
增量更新数据会写入到数据库表 b 中,设计任务时,需要在数据库中创建一张表 b ,便于与 csv 文件数据对比。
表 b 数据示例:b.xlsx
表 b 数据如下图所示:
拖入「文件输入」算子,读取 csv 文件。如下图所示:
拖入「DB表输入」算子,读取表 b 数据。如下图所示:
拖入「数据比对」算子,对比数据。如下图所示:
拖入「DB表输出」算子,执行新增操作。如下图所示:
数据转换7节点:进行比对后,新增和修改数据
新增、修改数据会写入到数据库表 c 中,设计任务时,需要在数据库中创建一张表 c ,便于与 csv 文件数据对比。
表 c 数据示例:c.xlsx
表 c 数据如下图所示:
与数据转换6节点不同的是,「DB表输出」算子设置不同:
2.7.4 记录已经读取完成的文件
table_info 表记录已写入 csv 文件的路径、写入表、写入时间。
1)拖入三个「数据转换」节点,分别与前面的「数据转换」节点相连。
2)进入「数据转换8节点」,拖入「Spark SQL」算子,SQL 语句如下:
select '${file}' as file,'${m}' as m ,'a' as table,current_timestamp as time
获取已写入 csv 文件的路径、.del文件值(不同值代表不同写入方式)、最终数据写入表名、当前时间,如下图所示:
3)拖入「DB表输出」算子,将数据写入到 table_info 表中。如下图所示:
数据转换9节点、数据转换10节点与数据转换8节点不同的是,「Spark SQL」算子的 SQL 语句:
数据转换9节点:select '${file}' as file,'${m}' as m ,'b' as table,current_timestamp as time
数据转换10节点:select '${file}' as file,'${m}' as m ,'c' as table,current_timestamp as time
3. 效果查看
1)点击「运行」按钮,任务执行成功后,如下图所示:
2)path1 表数据如下图所示:
table_info 表数据如下图所示:
a 表数据如下图所示:
b 表数据如下图所示:
c 表数据如下图所示:
3)点击「发布」按钮,将任务发布到生产模式。如下图所示:
可设置执行频率。详情请参见:定时调度