当前位置: 首页 > ops >正文

Flink 系列之二十七 - Flink SQL - 中间算子:OVER聚合

之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。

注意由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19
代码参考:https://github.com/forever1986/flink-study.git

目录

  • 1 OVER
    • 1.1 说明
    • 1.2 示例
  • 2 Top-N
    • 2.1 说明
    • 2.2 示例演示
  • 3 Deduplication(去重)
    • 3.1 说明
    • 3.2 示例演示

上一章讲了简单的聚合函数,在FlinkSQL中有一个函数OVER,它是一种特殊的开窗函数,按照之前Data Stream API的窗口理解,over函数有点跟滑动窗口是相似,当然FlinkSQL也有跟Data Stream API相匹配的开窗语法,这个后面再讲。这一章主要是关于OVER函数的使用。

1 OVER

1.1 说明

在标准SQL中有一个比较特殊的聚合方式 overover 可以以输入某一行数据为开始基准,计算它之前某个时间段或者某个数量的数据的聚合值,这over就是所谓的 开窗函数 ,简单理解 over 是一种特殊的开窗方式。以下是其语法,几乎和标准SQL的over函数一样:

SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...

说明:关于OVER的语法,这里特意说明一下

  • 1)range_definition:有两种定义方式,一种是基于时间(RANGE BETWEEN … AND …),一种是基于条数(ROWS BETWEEN … AND …)
  • 2)ORDER BY:ORDER BY后面的字段必须是时间属性,并且该时间属性必须是升序,不能降序

1.2 示例

示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线(水位线语法后续会详细讲解,这里就简单理解某个字段作为水位线)。然后通过select来演示over函数

1)创建log表:定义一个log表。

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND  --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)最近10秒的最大CPU值:通过over函数实现不同服务器最近10秒的最大cpu值

SET sql-client.execution.result-mode=TABLEAU; 
SELECT id , cpu, ts, max(cpu) OVER(PARTITION BY id ORDER BY ts RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS max_cpu FROM log;

在这里插入图片描述

从上图中,挑选id=5的服务器来看:

  • 1)第1条cpu=47.0140189246018,因此最大值是47.0140189246018。
  • 2)第2条cpu=73.34772303171671,它的最近10秒包括第1条和第2条,因此最大值是73.34772303171671
  • 3)第3、4、5条,它的最近10秒包括第2条,它们的值没超过第2条,因此最大值是73.34772303171671
  • 4)第6条cpu=73.34404846246665,它的最近10秒包括第3条、第4条、第5条和第6条,并没有包括第1条和第2条,因此最大值是第6条的值:73.34404846246665

3)最近3条的平均CPU值:通过over函数实现不同服务器最近3条的平均cpu值

SELECT id , cpu, ts, avg(cpu) OVER(PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS avg_cpu FROM log;  -- 注意这里是前2条,因为自己本身这条也算,因此就是最新3条;

在这里插入图片描述

从上图中,还是挑选id=5的服务器来看:

  • 1)第1条cpu=94.05246175428493,因此平均值是94.05246175428493。
  • 2)第2条cpu=50.151188309073596,它的最近3条包括第1条和第2条,因此平均值是72.10182503167927
  • 3)第3条cpu=63.592730465576764,它的最近3条包括第1条、第2条和第3条,因此平均值是69.26546017631176
  • 4)第4条cpu=72.41806173315702,它的最近3条包括第2条、第3条和第4条,因此平均值是62.05399350260245

4)定义子句外部的窗口:可以将over函数的内容定义在select外部,这样可以重复利用over函数

将第2)步的最近10秒的最大CPU值改造如下:

SET sql-client.execution.result-mode=TABLEAU; 
SELECT id , cpu, ts, max(cpu) OVER w AS max_cpu 
FROM log
window w as (PARTITION BY id ORDER BY ts RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW);

2 Top-N

2.1 说明

在实际应用场景中,经常会有排名之类的需求,比如销量排名前10的产品等。FlinkSQL提供了这样需求的写法,其实还是利用 over 函数的开窗,只不过有些内容不一样,具体语法如下:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]

说明:

  • 1)ROW_NUMBER():根据分区中行的顺序,为每行分配一个唯一的序列号,从 1 开始。目前,只支持作为 over window 函数
  • 2)PARTITION BY col1[, col2…]:指定分区列。每个分区都将有一个 Top-N 结果。
  • 3)ORDER BY col1 [asc|desc][, col2 [asc|desc]…]:指定排序列。不同列的排序方向可能不同。这里可以看出可以升序和降序,同时并要求是时间属性的列
  • 4)WHERE rownum <= N:识别此查询是 Top-N 查询。N 表示将保留 N 个最小或最大的记录。rownum <= N
  • 5)[AND conditions]:除了rownum <= N,在 where 子句中可以自由添加其他条件,但只能是AND的组合

2.2 示例演示

示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线。然后通过select来演示over函数。希望取每个服务器排名前3的cpu值。

1)创建log表:定义一个log表。

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND  --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)查询同一台服务器cpu值为top-3

SELECTid,cpu,ts,rownum
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY id ORDER BY cpu desc) as rownumFROM log
)
WHERE rownum<=3;

在这里插入图片描述

说明:这里以id=1的数据来说明

  • 1)第1条数据进来,排名第1是它自己
  • 2)第2条数据进来,其值比原先第1条高,因此回撤第1条数据,将排名第1的数据改为第2条,再将第一条数据插入为排名第2的数据

3 Deduplication(去重)

3.1 说明

在实际业务场景下,还经常有这么一个需求:按照某个字段重复,只保留第一条或最后一条。比如获取某一台服务器最新的cpu值。下面是其语法结构,其实可以看出就是Top-N中N=1的场景。

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name)
WHERE rownum = 1

注意:其ORDER BY必须是时间属性

3.2 示例演示

示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线。然后通过select来演示over函数。希望获取服务器id不重复的最新一条数据。

1)创建log表:定义一个log表。

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND  --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)查询同一台服务器cpu值为top-3

SELECTid,cpu,ts,rownum
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY id ORDER BY ts desc) as rownumFROM log
)
WHERE rownum = 1;

在这里插入图片描述

说明:这里以id=3的数据来说明

  • 1)第1条数据进来,因此是它自己
  • 2)第2条数据进来,由于SQL里面是ts降序的,这时候ts时间比第1条更晚,因此-U撤回第1条数据,+U更新最新的数据。

结语:本章针对FlinkSQL的OVER函数进行详细的讲解,同时也讲解两个特殊场景Top-N和Deduplication。本章中还涉及到一个水位线的定义,接下来的中间算子或多或少会涉及水位线,那么接下来章节会先讲解如何在FlinkSQL的实现水位线和窗口,再讲解其它FlinkSQL的中间算子。

http://www.xdnf.cn/news/13473.html

相关文章:

  • 国内电商API接口平台排名与解析
  • 2025年深度学习+多目标优化最新创新思路
  • 学习笔记087——Java接口和抽象类的区别和使用
  • 对比**CMake** 和 **PlatformIO** 构建嵌入式项目方式
  • C++(5)
  • Wordpress安装插件提示输入ftp问题解决
  • AIStarter一键启动平台:轻松运行AI项目,无需复杂配置
  • 五种IO模型与阻塞IO
  • LeetCode - 1047. 删除字符串中的所有相邻重复项
  • dockerfile 简单搭建 和 supervisor 进程管理工具
  • JAVASE:方法
  • 亚远景-ASPICE在汽车软件全生命周期管理中的作用
  • 7. 整数反转
  • 探索奇妙的LLM应用:提高工作效率的AI代理和RAG合集
  • Jemily张洁领域成就概述:匠心筑品牌,革新引航家用电梯新征程
  • 31.Python编程实战:自动化批量压缩与解压文件
  • GoldenDB简述
  • 【DVWA系列】——xss(DOM)——High详细教程
  • debian12 修改MariaDB数据库存储位置报错
  • 界面控件Kendo UI在实战应用——打通数据链路,重塑业务效率
  • UE5 蓝图按键控制物体旋转、暂停
  • Android NDK: Could not find application project directory
  • 【Mac技巧】修复Mac应用程序无法打开的解决办法
  • tryhackme 之反弹 shell 理解
  • FastAPI的数据契约:Pydantic与SQLModel联手打造健壮API
  • 斐讯N1部署Armbian与CasaOS实现远程存储管理
  • JS之Dom模型和Bom模型
  • strs[0] == “0“是否为字符串内容比较
  • 在GIS 工作流中实现数据处理(2)
  • 想考Kubernetes认证?CKA考试内容与报名全解析