当前位置: 首页 > news >正文

Flink 系列之二十九- Flink SQL - 中间算子:窗口聚合

之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。

注意由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19
代码参考:https://github.com/forever1986/flink-study.git

目录

  • 1 Window Top-N
    • 1.1 说明
    • 1.2 示例演示
  • 2 Window Deduplication
    • 2.1 说明
    • 2.2 示例演示

上一章针对了FlinkSQL中的水位线和窗口进行了讲解,那么这一章结合前面讲到的基于OVER函数的聚合TopN和Deduplication,实现在有窗口定义下的使用,下面统一做演示。

1 Window Top-N

1.1 说明

在《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中讲过使用OVER函数来实现Top-N。这里加入上一章讲解的窗口,按照需求查询每个窗口的Top-N,以下是Top-N的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

注意事项

  • 1)其中table_name替换为VTF窗口的写法,即可做到开窗求Top-N
  • 2)Window Top-N 需要 PARTITION BY 子句包含 窗口表值函数 或 窗口聚合 产生的 window_start 和 window_end
  • 3)不支持会话窗口

1.2 示例演示

示例说明:通过随机生成服务器cpu值,进行滚动窗口5秒内排名top-2的cpu值。为了演示说明方便,这里只生成一台服务器的数据

1)创建log表:使用它datagen的connector随机生成cpu值

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='1','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)结合Top-N和窗口查询:5秒长度滚动窗口,窗口内排名前2的cpu值

SET sql-client.execution.result-mode=TABLEAU; 
SELECTid,cpu,ts,rownum,window_start, window_end
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cpu desc) as rownum,window_start, window_endFROM TABLE(TUMBLE(TABLE log, DESCRIPTOR(ts), INTERVAL '5' SECOND))
)
WHERE rownum<=2;

在这里插入图片描述

说明:从上图可以看到每个窗口不断地打印出前2的cpu值,[15,20)的窗口只打印一条是刚好这个窗口只有1条数据(19秒的时候生产的)

2 Window Deduplication

2.1 说明

在《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中讲过使用OVER函数来实现Deduplication。这里加入上一章讲解的窗口,按照需求查询每个窗口的去重Deduplication,以下是Deduplication的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

注意事项

  • 1)其中table_name替换为VTF窗口的写法,即可做到开窗去重
  • 2)Window Top-N 需要 PARTITION BY 子句包含 窗口表值函数 或 窗口聚合 产生的 window_start 和 window_end
  • 3)不支持会话窗口。不支持处理时间,只支持事件时间

2.2 示例演示

示例说明:通过随机生成服务器cpu值,进行滚动窗口5秒内每台服务器最大cpu值。

1)创建log表:使用它datagen的connector随机生成cpu值

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='2','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)结合Top-N和窗口查询:5秒长度滚动窗口,窗口内不同服务器最大的cpu值

SET sql-client.execution.result-mode=TABLEAU;
SELECTid,cpu,ts,rownum,window_start, window_end
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY window_start, window_end, id ORDER BY cpu desc) as rownum,window_start, window_endFROM TABLE(TUMBLE(TABLE log, DESCRIPTOR(ts), INTERVAL '5' SECOND))
)
WHERE rownum = 1;

在这里插入图片描述

说明:从上图可以看到每个窗口不断地打印出最大的cpu值

结语:本章主要是将《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中的Top-N和Deduplication场景在有窗口定义下的情况进行演示。下章还是继续回到FlinkSQL的中间算子。

http://www.xdnf.cn/news/1010791.html

相关文章:

  • Ubuntu安装RTX5090显卡驱动
  • Java开发中常见的语法陷阱与规避方法
  • ThreadPoolTaskExecutor+CompletableFuture实现多线程异步数据同步和自定义线程池监控和动态调整实现
  • 网络原理9-HTTP2
  • 三轴云台之运动控制系统篇
  • C++ 语言基础之数据类型详解
  • LangGraph入门教程:构建循环状态管理的LLM应用
  • 哪些方面可以做PCDN
  • Memory Repair (五)
  • SMB协议在Windows内网中的核心地位
  • Java SE - 继承与多态
  • 广东省省考备考(第二十七天6.12)—言语:逻辑填空(练习)
  • Sentinel 流量控制安装与使用
  • 【游戏设计】游戏视角类型及核心特点分析
  • 脑电震动音频震动信号模拟器设计资料:758-2路32bit DA 脑电震动音频信号模拟器
  • 单连杆倾角估计:互补滤波器的 MATLAB 仿真实现
  • 【Python打卡Day35】模型可视化与推理@浙大疏锦行
  • bindService 和 startService 生命周期对比
  • JavaWeb期末速成 Servlet
  • qemu-guest-agent详解
  • 亚马逊woot常见问题第三弹
  • LevelDB介绍和内部机制
  • CC工具箱使用指南:【面要素四至】
  • 深度学习5——循环神经网络
  • 智能PDU:从单一功能到多维度升级
  • 洛谷P4555 最长双回文串
  • Ntfs!NtfsFreeRestartTableIndex函数分析
  • 图片压缩工具类
  • Photoshop 2025 性能配置全攻略:硬件选购与软件优化指南
  • 医疗器械行业系统如何提升医疗器械企业的核心竞争力?