Flink 系列之二十七 - Flink SQL - 中间算子:OVER聚合
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。
注意:由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19。
代码参考:https://github.com/forever1986/flink-study.git
目录
- 1 OVER
- 1.1 说明
- 1.2 示例
- 2 Top-N
- 2.1 说明
- 2.2 示例演示
- 3 Deduplication(去重)
- 3.1 说明
- 3.2 示例演示
上一章讲了简单的聚合函数,在FlinkSQL中有一个函数OVER,它是一种特殊的开窗函数,按照之前Data Stream API的窗口理解,over函数有点跟滑动窗口是相似,当然FlinkSQL也有跟Data Stream API相匹配的开窗语法,这个后面再讲。这一章主要是关于OVER函数的使用。
1 OVER
1.1 说明
在标准SQL中有一个比较特殊的聚合方式 over 。 over 可以以输入某一行数据为开始基准,计算它之前某个时间段或者某个数量的数据的聚合值,这over就是所谓的 开窗函数 ,简单理解 over 是一种特殊的开窗方式。以下是其语法,几乎和标准SQL的over函数一样:
SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...
说明:关于OVER的语法,这里特意说明一下
- 1)range_definition:有两种定义方式,一种是基于时间(RANGE BETWEEN … AND …),一种是基于条数(ROWS BETWEEN … AND …)
- 2)ORDER BY:ORDER BY后面的字段必须是时间属性,并且该时间属性必须是升序,不能降序
1.2 示例
示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线(水位线语法后续会详细讲解,这里就简单理解某个字段作为水位线)。然后通过select来演示over函数
1)创建log表:定义一个log表。
CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);
2)最近10秒的最大CPU值:通过over函数实现不同服务器最近10秒的最大cpu值
SET sql-client.execution.result-mode=TABLEAU;
SELECT id , cpu, ts, max(cpu) OVER(PARTITION BY id ORDER BY ts RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS max_cpu FROM log;
从上图中,挑选id=5的服务器来看:
- 1)第1条cpu=47.0140189246018,因此最大值是47.0140189246018。
- 2)第2条cpu=73.34772303171671,它的最近10秒包括第1条和第2条,因此最大值是73.34772303171671
- 3)第3、4、5条,它的最近10秒包括第2条,它们的值没超过第2条,因此最大值是73.34772303171671
- 4)第6条cpu=73.34404846246665,它的最近10秒包括第3条、第4条、第5条和第6条,并没有包括第1条和第2条,因此最大值是第6条的值:73.34404846246665
3)最近3条的平均CPU值:通过over函数实现不同服务器最近3条的平均cpu值
SELECT id , cpu, ts, avg(cpu) OVER(PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS avg_cpu FROM log; -- 注意这里是前2条,因为自己本身这条也算,因此就是最新3条;
从上图中,还是挑选id=5的服务器来看:
- 1)第1条cpu=94.05246175428493,因此平均值是94.05246175428493。
- 2)第2条cpu=50.151188309073596,它的最近3条包括第1条和第2条,因此平均值是72.10182503167927
- 3)第3条cpu=63.592730465576764,它的最近3条包括第1条、第2条和第3条,因此平均值是69.26546017631176
- 4)第4条cpu=72.41806173315702,它的最近3条包括第2条、第3条和第4条,因此平均值是62.05399350260245
4)定义子句外部的窗口:可以将over函数的内容定义在select外部,这样可以重复利用over函数
将第2)步的最近10秒的最大CPU值改造如下:
SET sql-client.execution.result-mode=TABLEAU;
SELECT id , cpu, ts, max(cpu) OVER w AS max_cpu
FROM log
window w as (PARTITION BY id ORDER BY ts RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW);
2 Top-N
2.1 说明
在实际应用场景中,经常会有排名之类的需求,比如销量排名前10的产品等。FlinkSQL提供了这样需求的写法,其实还是利用 over 函数的开窗,只不过有些内容不一样,具体语法如下:
SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]
说明:
- 1)ROW_NUMBER():根据分区中行的顺序,为每行分配一个唯一的序列号,从 1 开始。目前,只支持作为 over window 函数
- 2)PARTITION BY col1[, col2…]:指定分区列。每个分区都将有一个 Top-N 结果。
- 3)ORDER BY col1 [asc|desc][, col2 [asc|desc]…]:指定排序列。不同列的排序方向可能不同。这里可以看出可以升序和降序,同时并要求是时间属性的列
- 4)WHERE rownum <= N:识别此查询是 Top-N 查询。N 表示将保留 N 个最小或最大的记录。rownum <= N
- 5)[AND conditions]:除了rownum <= N,在 where 子句中可以自由添加其他条件,但只能是AND的组合
2.2 示例演示
示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线。然后通过select来演示over函数。希望取每个服务器排名前3的cpu值。
1)创建log表:定义一个log表。
CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);
2)查询同一台服务器cpu值为top-3:
SELECTid,cpu,ts,rownum
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY id ORDER BY cpu desc) as rownumFROM log
)
WHERE rownum<=3;
说明:这里以id=1的数据来说明
- 1)第1条数据进来,排名第1是它自己
- 2)第2条数据进来,其值比原先第1条高,因此回撤第1条数据,将排名第1的数据改为第2条,再将第一条数据插入为排名第2的数据
3 Deduplication(去重)
3.1 说明
在实际业务场景下,还经常有这么一个需求:按照某个字段重复,只保留第一条或最后一条。比如获取某一台服务器最新的cpu值。下面是其语法结构,其实可以看出就是Top-N中N=1的场景。
SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name)
WHERE rownum = 1
注意:其ORDER BY必须是时间属性
3.2 示例演示
示例说明:通过自动生成一个服务器的cpu值,其中ts 为当前时间,这里把它定义为watermark水位线。然后通过select来演示over函数。希望获取服务器id不重复的最新一条数据。
1)创建log表:定义一个log表。
CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='5','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);
2)查询同一台服务器cpu值为top-3:
SELECTid,cpu,ts,rownum
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY id ORDER BY ts desc) as rownumFROM log
)
WHERE rownum = 1;
说明:这里以id=3的数据来说明
- 1)第1条数据进来,因此是它自己
- 2)第2条数据进来,由于SQL里面是ts降序的,这时候ts时间比第1条更晚,因此-U撤回第1条数据,+U更新最新的数据。
结语:本章针对FlinkSQL的OVER函数进行详细的讲解,同时也讲解两个特殊场景Top-N和Deduplication。本章中还涉及到一个水位线的定义,接下来的中间算子或多或少会涉及水位线,那么接下来章节会先讲解如何在FlinkSQL的实现水位线和窗口,再讲解其它FlinkSQL的中间算子。