1. 概述
1.1 应用场景
大数据量场景下的 json 数据解析,存在问题如下:
如果存在几条不合法的 json 格式数据,解析时会导致整个定时任务运行终止。
json 解析是数据处理过程,定时任务的脏数据容忍没办法排除 json 不合法数据对任务的影响。
用户希望:
过滤出不合法的 json 格式数据,使其不影响定时任务的运行。
大数据量场景下,快速找出不合法的 json 格式数据。
1.2 实现思路
使用 Python 的 is_valid_json 判断 json 是否合法,合法的的 json 参与解析。
1.3 任务展示
FineDataLink 中的数据处理过程,详情参见:https://demo.finedatalink.com/ 「JSON解析前过滤出不合法的数据」。
2. 操作步骤
2.1 准备工作
1)本文方案中需要用到「Python算子」,需参考 Python 算子 文档准备环境、了解使用方法。
2)本文方案中,json 数据保存在 txt 文件中,所以需要准备:配置FTP/SFTP数据连接、或者配置服务器本地目录数据连接
待解析的 json 数据如下所示:
json 示例数据:test_1.txt
2.2 读取 json 数据
1)新建定时任务,拖入「数据转换」节点,进入「数据转换」节点。
2)本文方案中,json 数据保存在 txt 文件中,所以需要拖入「文件输入」算子读取 json 数据。用户可根据实际情况修改该步骤。如下图所示:
「读取文件后缀」设置txt;列分隔符为「无」;不勾选「首行为字段名」,输出字段选择「手动获取」,输出字段类型为varchar、名称为column。
点击「数据预览」,如下图所示:
2.3 过滤出不合法json数据
1)拖入「Python算子」,使用 Python 的 is_valid_json 判断 json 是否合法。如下图所示:
注:下方代码中的 input 的值需要根据实际情况修改。
import pandas as pd
# 必须使用pandas库
# 若连接了数据源,可以鼠标点击上方相应数据源进行使用,输入源在python中以pandas的Dataframe数据类型存在,可以通过Dataframe的方法进行数据处理
# ----------------------------------------
import json
def is_valid_json(json_string):
try:
json.loads(json_string)
return True
except json.JSONDecodeError:
return False
# 使用示例
input=$[文件输入]
a=[]
for row in input.index:
json_string=input.loc[row]['column']
a.append(is_valid_json(json_string))
input['isvalid']=a
# ----------------------------------------
output = input
# 将需要输出给下游算子的数据赋值给output变量,当output是Dataframe数据类型时,输出为二维表的形式,当output是其他数据类型时,输出为字符串的形式
代码含义:
遍历输入数据的每一行,将每行的特定列(column列)中的 json 字符串进行解析,检查其是否是有效的 json 格式。然后,将检查结果添加到一个新列 isvalid 中,表示每个 json 字符串是否有效。最后,将带有新列的输入数据作为输出传递给下游算子。
2)拖入「数据过滤」算子,过滤出 isvalid 字段为真的数据(json 格式合法的数据)。如下图所示:
2.4 解析数据
1)拖入「JSON解析」算子,解析格式正确的数据。如下图所示:
2)点击「数据预览」,如下图所示:
2.5 输出数据
拖入「DB表输出」算子,将解析后的数据输出。写入方式选择「追加写入数据」(根据实际情况设置)。如下图所示:
2.6 效果查看
点击上方「运行」按钮,运行成功后,日志如下图所示:
数据落库后如下图所示:
2.7 后续步骤
可点击「发布」按钮,将定时任务发布到生产模式,如下图所示:
生产模式中,点击调度计划,可设置任务执行频率。