反馈已提交

网络繁忙

Flink SQL

  • 文档创建者:Wendy123456
  • 历史版本:9
  • 最近更新:Wendy123456 于 2026-06-10
  • icon提示:

    产品注册版本为「v5.0」时,才能使用数据开发-实时任务:

    1) 如需申请免费试用或增购功能点,请填写链接:实时任务意向使用申请表

    2) 如需了解更多实时任务详细信息,可联系技术支持(技术支持联系方式:前往「服务」,选择「在线支持」、电话「400-811-8890」)。

    1. 概述

    1.1 版本

    FineDataLink 版本功能变动
    5.0.1.4
    支持使用 Flink SQL 算子
    5.0.1.5实时任务中若使用 WebSocket输入MQTT输入IBM MQ输入Pulsar输入,不能再使用Flink SQL」节点
    5.0.11.2新增「事件时间」字段,可以定义事件时间,满足用户基于事件时间的转换操作

    1.2 应用场景

    • 实时任务发展前期,可视化算子的单行转换能力、多行聚合能力不足以满足客户各式各样的转换需求。

    • 用户习惯使用 SQL 语句进行数据开发,但目前实时任务中不支持编写 SQL 语句处理数据。

    1.3 功能简介

    新增Flink SQL」算子,用户可编写 SQL 语句在实时任务中对数据进行查询和处理。如下图所示:

    1780539551919805.png

    1.4 前提条件

    使用「Flink SQL算子前,需要:

    2. 功能说明

    1780539536415979.png

    2.1 SQL语句

    注:不支持在一个「Flink SQL」算子中,使用多个 select 语句。

    设置项说明
    函数支持的函数:Flink相关函数
    支持、不支持语句支持语句:select、withwheredistincttumbletablehop cumulate session grouping setsrollupcube group by havingoverorder by
    不支持语句:joinsemiunionintersectexceptin existslimit、create、drop、alter、Insert、analyze、describe、truncate、explain、use、show、load、show、load、unload、set、reset、jar、job、update、delete、call
    使用方式

    1)在 SQL 中,可以通过 ${valueName} 的方式,使用自定义的任务参数

    2)在输入 SQL 语句时有联想功能,支持联想 关键词 、函数、任务静态参数、上游算子

    2.2 事件时间字段

    5.0.11.2 及之后版本新增该字段。

    2.2.1 应用场景

    • 用户需要基于 FDL,进行销售分析,统计每 10 分钟内每种商品售卖的情况,数据来自于 CRM 系统数据库。

    • 需要基于设备的原始数据进行设备监控,数据来源是 Kafka,想要分析每 1 分钟设备的平均温度。

    5.0.11.2 之前版本,用户基于时间窗口的汇总计算时,由于没有定义事件时间,无法进行转换,Flink SQL 会进行报错。

    2.2.2 功能说明

    1780539509982973.png

    事件时间字段说明:

    设置项
    说明
    字段名默认为event_time,支持修改
    字段类型

    默认为timestamp(3),不支持用户修改 

    • 当「来自上游字段」类型为timestamp(3),event_time 字段类型默认逻辑:复制原始字段

    • 「来自上游字段」类型不为timestamp(3),event_time字段类型默认逻辑:复制原始字段并转换类型为 TIMESTAMP(3)

    • 当「来自上游字段」精度不为 3 时,event_time字段默认逻辑:精度超过3,舍弃为3,精度小于3,补齐为3

    来自上游字段

    1)仅允许选择一个字段作为事件时间字段;字段选择为实际的业务字段,即数据中含有的字段,是业务发生的时间,而不是数据产生的时间

    2) 支持的字段类型: 

    • 数值类型:bigint、long 

    • 字符类型:string、varchar 

    • 日期类型:date、time、timestamp、timestamp_ltz

    3)选择上游字段后,工程会直接定义watermark,默认值为 30 s;用户可修改 FineDB 参数来修改 watermark,具体操作请联系技术支持

    SQL语句说明:

    需要先定义「事件时间字段」,再在 SQL 语句中使用窗口函数:

    函数类别
    函数全名/语法功能描述SQL示例
    窗口定义滚动窗口/tumbleTUMBLE 函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口SELECT * FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);

    滑动窗口/hop滑动窗口函数指定元素到一个定长的窗口中。和滚动窗口很像,有窗口大小参数,另外增加了一个窗口滑动步长参数。如果滑动步长小于窗口大小,就能产生数据重叠的效果。在这个例子里,数据可以被分配在多个窗口SELECT * FROM HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES);

    累计窗口/cumulateCUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小SELECT * FROM CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES);
    窗口聚合窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据
    点击展开更多

    滚动窗口:SELECT window_start, window_end, SUM(price) AS total_price FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES) GROUP BY window_start, window_end;

    滑动窗口:SELECT window_start, window_end, SUM(price) AS total_price FROM HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) GROUP BY window_start, window_end;

    累计窗口:SELECT window_start, window_end, SUM(price) AS total_price FROM CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES) GROUP BY window_start, window_end;


    窗口

    TOP-N

    窗口 Top-N 是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。
    点击展开更多
    SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) -- relation applied windowing TVF WHERE rownum <= N [AND conditions]


    窗口去重窗口去重是一种特殊的去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据
    点击展开更多
    SELECT * FROM ( SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownum FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) ) WHERE rownum <= 1;

    3. 操作步骤

    本文示例:使用「CDC输入算子取出 MySQL 数据,使用Flink SQL」算子将取出的 name 字段根据不同值进行分组,并统计每个不同 name 字段出现的次数,返回 name 值和对应的出现次数。

    3.1 设置前置算子

    注1:「Flink SQL」算子前必须接一个前置算子。

    注2:5.0.1.5 及之后版本,实时任务中若使用 WebSocket输入MQTT输入IBM MQ输入Pulsar输入,不能再使用Flink SQL」节点

    1)新建实时任务。

    2)拖入一个「CDC输入算子,取出待处理数据。如下图所示:

    3.png

    点击「数据预览,可查看取出的数据。如下图所示:

    6.png

    3.2 设置 Flink SQL 算子

    拖入Flink SQL」算子,将取出的 name 字段根据不同值进行分组,并统计每个不同 name 字段出现的次数,返回 name 值和对应的出现次数。如下图所示:

    7.png

    点击「数据预览,如下图所示:

    1753237652889991.png

    3.3 后续步骤

    用户可继续处理数据,或者后接输出算子将数据输出。

    注:Flink SQL」算子后可接多个输出算子。




    附件列表


    主题: 数据开发-实时任务
    • 有帮助
    • 没帮助
    • 只是浏览
    中文(简体)

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭



    AI

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持