历史版本6 :获取目录下符合条件的文件,按写入方式读取入库并记录状态 返回文档
编辑时间: 内容长度:图片数:目录数: 修改原因:

目录:

1. 概述编辑

1.1 应用场景

用户业务流程中下发的文件,有特定的文件生成则代表下发完成。比如:文件下发完成后,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。

现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

注:本文方案适用于 Linux 系统。

1.2 实现思路

1)将需要遍历的目录生成参数,「Shell 脚本」节点使用该参数获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,拼接出待读取的 csv 文件路径,将数据保存到到数据库表中。

2)根据第一步输出的数据库表,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。

3)遍历第二步的参数,执行以下步骤:

  • 判断文件是否已读取,若未读取,执行下面步骤。

  • 读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。

  • 记录已写入 csv 文件的路径、写入表、写入时间。

2. 示例编辑

2.1 场景模拟

示例文件结构:csv.zip

某银行用户,业务流程中下发的文件,若下发完成,目录下会生成与文件同名的 .ok 文件(用来判断文件是否下发完成)、.del 文件(根据值的不同判断写入方式)、.sql 文件。如下图所示:

1725245363179869.png

其中,文件所处的目录结构如下图所示:

csv 文件夹下的文件夹是以日期命名的(本文示例是以 2024 年 8 月份日期命名)。

点击展开更多
4.png

现需要将已经下发完成(生成 .ok 文件)的 csv 文件按照 .del 记录的方式进行读取入库,并且读取文件后,需要记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

希望每天检查一次,是否有新的已下发完成的文件。

2.2 方案说明

1)由于 csv 下,文件夹是以日期命名的,构建遍历目录并输出为参数,参数值示例/data/demo_files/csv/2024-08-01/

2)「Shell 脚本」节点使用该参数遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中,然后根据 output.csv 文件数据,新增列拼接出待读取的 csv 文件路径,将数据输出到 path1 表中。

3)根据 path1 表数据,将 .del 文件(需要根据该文件判断写入方式)路径、待读取 csv 文件路径输出为参数。

4)遍历第三步的参数,执行以下步骤:

  • 根据待读取 csv 文件路径是否在 table_info 表中,判断文件是否已读取,若未读取,执行下面步骤。

  • 读取 del 文件数据并输出为参数,根据参数值的不同,判断写入方式,根据不同的写入方式,将数据写入不同的表中。

  • table_info 表记录已写入 csv 文件的路径、写入表、写入时间。

61.png

2.3 准备工作

1)由于需要用到Shell 脚本」节点,所以需要配置 SSH 协议数据连接,为执行 shell 脚本做准备。

2)「Shell 脚本」节点运行后,会生成一个 output.csv 文件,由于需要使用文件输入算子读取该文件,建议准备一个 FTP/SFTP 数据连接。

2.4 构建遍历目录参数

本文示例中,需要使用「Shell 脚本」节点到每个日期命名的文件夹中找到 .ok、.sql、.del 文件,所以需要先构建构建遍历目录并输出为参数。

1725248286301164.png

2.4.1 设置日期序列开始时间、结束时间

实际场景中,会有每个月所有天的日期文件夹,由于需要遍历所有文件夹找到 .ok、.sql、.del 文件,需要构建日期序列。

1)新建定时任务,拖入一个数据转换节点,进入数据转换节点。

2)拖入「Spark SQL」算子,输入语句设置日期序列开始时间和结束时间。如下图所示:

本文示例(构建的是 2024 年 8 月份的日期序列):

SELECT DATE_FORMAT(CONCAT(YEAR(CURRENT_DATE()), '-08-01'), 'yyyy-MM-dd') AS s_date,

LAST_DAY(CONCAT(YEAR(CURRENT_DATE()), '-08-01')) AS e_date

其他方案(可构建当年当月第一天到当天的日期序列,用户根据实际情况选择):

SELECT CONCAT(LEFT(current_date,7),"-01") as s_date,LEFT(current_date,10) as e_date

8.png

点击「数据预览」,如下图所示:

1725259622140907.png

2.4.2 构建日期序列

再拖入一个「Spark SQL」算子,使用 SQL 语句构建日期序列。如下图所示:

SELECT explode(sequence(to_date(s_date), to_date(e_date), interval 1 day)) as date from Spark SQL

注:SQL 语句中的 Spark SQL 为点击生成。

10.png

点击数据预览,如下图所示:

1725259837612850.png

2.4.3 构建遍历目录并生成参数

本文示例中,目录结构如下图所示:

29.png

后续步骤中,「Shell 脚本」节点需要到/data/demo_files/csv/2024-08-XX/下,找到符合条件的文件,本文需要构建遍历目录。

1)再拖入一个「Spark SQL」算子,构建遍历目录。如下图所示:

select concat('\/data\/demo_files\/csv\/',date,"\/") as path  from Spark SQL1

注:Spark SQL1 为点击生成;用户根据实际路径修改 SQL 语句。

13.png

点击数据预览,如下图所示:

1725260186227943.png

2)拖入「参数输出」算子,将生成的路径输出为参数。如下图所示:

注:建议设置调试值,便于后续节点引用该参数时查看效果;调试值不影响实际运行结果。

15.png

2.5 获取指定目录下符合条件的文件,并拼接下发完成的csv文件路径

1725260644115264.png

2.5.1 获取指定目录下符合条件的文件

1)由于 2.4 节生成的参数为多个,「Shell 脚本」节点每次只能使用一个参数获取符合条件的文件信息,所以需要用到循环容器节点。

拖入循环容器节点,遍历目录参数。如下图所示:

17.png

2)循环容器节点内拖入「Shell 脚本」节点,遍历获取指定目录下的 .ok、.sql、.del 文件,将文件名和文件路径输入到 output.csv 中。如下图所示:

.sh文件示例:.sh文件.zip

.sh文件代码如下所示:

注:用户根据实际情况修改代码;代码中的 $1 为 2.4.3 节输出的参数名称。

#!/bin/bash  
  
# 指定要搜索的目录  
#SEARCH_DIR="/data/demo_files/csv/2024-04-17/"
 SEARCH_DIR=$1
# 指定输出CSV文件的名称  
OUTPUT_CSV="/data/demo_files/csv/output.csv"  
  
# 使用find命令搜索文件,并将结果输出到CSV文件  
find "$SEARCH_DIR" -type f \( -name "*.ok" -o -name "*.sql" -o -name "*.del" \) -printf "%p,%f\n" > "$OUTPUT_CSV"  
  
echo "搜索完成,结果已保存到 $OUTPUT_CSV"

18.png

output.csv 文件说明:

「Shell 脚本」节点执行后,会生成一个 output.csv 文件,本文示例中,为避免 output.csv 文件后期过大,.sh 文件每次执行都是清空 output.csv 文件数据再写入,而非追加写入。output.csv 文件数据如下图所示:

1725262112795196.png

由于后续步骤中要读取 output.csv 文件,但此时该文件并未生成,建议在 output.csv 文件路径下上传一个示例文件(由于每次都是清空目标表再写入,示例数据不会影响实际任务运行结果)。如下图所示:

示例文件:output文件.zip

29.png

2.5.2 拼接下发完成的csv文件路径

下发完成的 csv 文件,保存在 file 文件夹中。如下图所示:

1725263423649442.png

本节需要根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径,便于后续读取 csv 文件数据。

1)「Shell 脚本」节点后拖入一个数据转换节点,进入数据转换节点。拖入文件输入算子,读取 output.csv 文件数据。如下图所示:

22.png

点击数据预览,如下图所示:

25.png

2)拖入一个新增计算列算子,根据 output.csv 文件数据,拼接出已经下发完成的 csv 文件路径。

新增 name 列,值为:CONCATENATE(left(#{column},find("/_thread",#{column})),"file/",left(#{column1},find(".",#{column1})),"csv")

新增 date 列,值为:FORMAT(NOW(),"yyyy-MM-dd hh:mm:ss")

如下图所示:

23.png

点击「数据预览」,如下图所示:

24.png

3)拖入字段设置算子,修改字段名。如下图所示:

27.png

点击数据预览,如下图所示:

path 字段值为 .ok、.sql、.del 文件路径,file 字段值为已经下发完成的 csv 文件路径。

28.png

4)拖入DB表输出算子,将数据输出到 path1 表中。如下图所示:

30.png

写入方式选择追加写入数据,逻辑主键设置为path字段,主键冲突策略选择主键冲突,覆盖目标表的数据。如下图所示:

1725264334676565.png

5)由于 path1 表后续会用到,点击运行按钮,运行任务生成 path1 表。如下图所示:

32.png

任务运行后,path1 表数据如下图所示:

33.png

2.6 将.del文件路径、待读取csv文件路径输出为参数

1)「循环容器」节点外接一个「数据转换」节点,进入「数据转换」节点。

2)拖入DB表输入算子,读取 path1 表数据。如下图所示:

34.png

3)拖入「Spark SQL」算子,过滤出 .del 文件的数据。如下图所示:

.del 文件不同值代表不同的写入方式。

35.png

点击数据预览,如下图所示:

36.png

4)拖入参数输出算子,将 path、file 字段输出为参数。如下图所示:

  • path 字段值:不同值代表不同写入方式;可根据该值判断写入方式。

  • file 字段值:已下发完成的 csv 文件路径;可根据该值读取 csv 文件数据。

1725265468595681.png

2.7 根据不同写入方式写入数据

62.png

2.7.1 设置循环容器

由于 2.6 节输出的参数为多个,「文件输入」算子每次读取 csv 文件时OR条件分支节点每次根据参数值进行判断时,只能使用一个参数。所以,需要使用到「循环容器」节点,遍历所有参数。

拖入「循环容器」节点,遍历对象为 2.6 节输出的两个参数。如下图所示:

39.png

2.7.2 判断csv文件是否读取过

每次循环时,判断传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的记录数是否为 0 ,若为 0 ,则执行后续节点。

1)设计任务时,需要创建一张 table_info 表:table_info.xlsx

表字段如下图所示:

1725268863564333.png

该表记录已写入 csv 文件的路径、.del 文件值(代表不同写入方式)、csv 文件写入表名称、写入时间。

2)「循环容器」节点内拖入数据转换节点,进入数据转换节点。

3)拖入DB表输入算子,统计传入的 file 参数(已下发完成的 csv 文件路径)与记录表 table_info 表中的 file 相同的数量如下图所示:

SELECT count(*) as t FROM `demotest`.`table_info`
where file='${file}'

41.png

4)将 SQL 语句的值输出为参数。如下图所示:

42.png

5)拖入「条件分支」节点、数据转换节点,条件分支节点中设置若参数 t 为 0(该 csv 文件未被读取过),则执行后续节点。如下图所示:

43.png

2.7.3 获取写入方式并写入数据

62.png

写入方式记录在 .del 文件中,当值为 0 时,清空目标表再写入数据;当值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。

使用 table_info 表记录已经读取完成的文件,以便在任务异常重启时避免重复写入。

1)进入「数据转换4」节点,拖入文件输入算子,读取 .del 文件。如下图所示:

44.png

2)拖入「参数输出」算子,将 .del 文件的值输出,便于后续判断写入方式。如下图所示:

45.png

3)拖入「条件分支」节点、三个数据转换节点,根据 m 参数值的不同,执行不同下游分支节点。

当 m 值为 0 时,清空目标表再写入数据;值为 1 时,新增数据;当值为 2 时,增加/更新表数据;不同写入方式写入到不同表中。

63.png

数据转换5节点:全量更新数据

拖入「文件输入」算子,读取 csv 文件。如下图所示:

47.png

点击「数据预览」,如下图所示:

1725274893436443.png

拖入DB表输出算子,将数据输出到数据库表中。如下图所示:

49.png

数据转换6节点:增量更新数据

1725275182845703.png

增量更新数据会写入到数据库表 b 中,设计任务时,需要在数据库中创建一张表 b ,便于与 csv 文件数据对比。

表 b 数据示例:b.xlsx

表 b 数据如下图所示:

1725275327957360.png

拖入文件输入算子,读取 csv 文件。如下图所示:

52.png

拖入「DB表输入」算子,读取表 b 数据。如下图所示:

53.png

拖入「数据比对」算子,对比数据。如下图所示:

54.png

拖入「DB表输出」算子,执行新增操作。如下图所示:

55.png

数据转换7节点:进行比对后,新增和修改数据

1725276083552045.png

新增、修改数据会写入到数据库表 c 中,设计任务时,需要在数据库中创建一张表 c ,便于与 csv 文件数据对比。

表 c 数据示例:c.xlsx

表 c 数据如下图所示:

1725276189707427.png

数据转换6节点不同的是,「DB表输出」算子设置不同:

56.png

2.7.4 记录已经读取完成的文件

58.png

table_info 表记录已写入 csv 文件的路径、写入表、写入时间。

1)拖入三个「数据转换」节点,分别与前面的数据转换节点相连。

2)进入数据转换8节点,拖入Spark SQL算子,SQL 语句如下:

select '${file}' as file,'${m}' as m ,'a' as table,current_timestamp as time

获取已写入 csv 文件的路径、.del文件值(不同值代表不同写入方式)、最终数据写入表名、当前时间,如下图所示:

59.png

3)拖入「DB表输出」算子,将数据写入到 table_info 表中。如下图所示:

60.png

数据转换9节点数据转换10节点数据转换8节点不同的是,Spark SQL算子的 SQL 语句:

数据转换9节点:select '${file}' as file,'${m}' as m ,'b' as table,current_timestamp as time

数据转换10节点:select '${file}' as file,'${m}' as m ,'c' as table,current_timestamp as time

3. 效果查看编辑

1)点击运行按钮,任务执行成功后,如下图所示:

65.png

2)path1 表数据如下图所示:

64.png

table_info 表数据如下图所示:

1725280817712230.png

a 表数据如下图所示:

1725280911770101.png

b 表数据如下图所示:

1725280951992517.png

c 表数据如下图所示:

1725280987532082.png

3)点击「发布」按钮,将任务发布到生产模式。如下图所示:

70.png

可设置执行频率。详情请参见:定时调度