当前位置: 首页 > news >正文

FlinkSql(详细讲解二)

以下是 Flink SQL 进阶内容的讲解,聚焦流处理核心场景(窗口计算、流表关联)、函数扩展及 CDC 数据同步,包含具体场景和示例,便于理解和实践。

一、窗口计算(流处理核心)

流数据是 “无界” 的,无法直接像批处理那样计算 “全量数据”。窗口(Window)通过 “时间或数量” 将无界流切分为 “有界的小批量数据”,实现对动态数据的分段计算(如 “每 5 分钟统计一次订单量”)。

Flink SQL 支持 时间窗口(基于时间划分)和 计数窗口(基于数据条数划分),其中时间窗口最常用,又分以下 3 类:

1. 滚动窗口(TUMBLE Window)
  • 特点:窗口长度固定,无重叠,按固定时间间隔 “切分” 流数据。
  • 适用场景:固定周期统计(如每小时汇总、每日报表)。
  • 语法
    TUMBLE(时间字段, 窗口长度)  -- 时间字段:事件时间或处理时间;窗口长度:如 INTERVAL '5' MINUTE(5分钟)
    
  • 示例:按事件时间统计每 5 分钟的用户点击量
    基于上一次创建的 kafka_user_behavior 表(含 event_time 事件时间和水印):
    SELECTTUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,  -- 窗口开始时间TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,      -- 窗口结束时间COUNT(*) AS click_count
    FROM kafka_user_behavior
    WHERE behavior = 'click'  -- 只统计点击行为
    GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);  -- 按滚动窗口分组
    
    • 结果会每 5 分钟输出一次该窗口内的点击总量,窗口时间基于数据的实际产生时间(event_time)。
2. 滑动窗口(HOP Window)
  • 特点:窗口长度固定,但有重叠(滑动步长 < 窗口长度),可更频繁地更新统计结果。
  • 适用场景:需要实时性更高的统计(如每 1 分钟统计过去 5 分钟的订单量)。
  • 语法
    HOP(时间字段, 滑动步长, 窗口长度)  -- 滑动步长:窗口移动的间隔
    
  • 示例:每 1 分钟统计过去 5 分钟的用户购买量
    SELECTHOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_start,HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_end,COUNT(*) AS buy_count
    FROM kafka_user_behavior
    WHERE behavior = 'buy'
    GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE);
    
    • 窗口长度 5 分钟,每 1 分钟滑动一次,因此相邻窗口会有 4 分钟的重叠数据。
3. 会话窗口(SESSION Window)
  • 特点:无固定长度,根据 “数据活跃度” 划分 —— 如果一段时间(会话超时时间)内没有新数据,则窗口关闭。
  • 适用场景:分析用户会话行为(如用户连续操作期间的行为汇总,间隔超过 30 分钟视为新会话)。
  • 语法
    SESSION(时间字段, 超时时间)  -- 超时时间:如 INTERVAL '30' MINUTE(30分钟无数据则窗口关闭)
    
  • 示例:统计用户会话内的行为总数(超时 30 分钟)
    SELECTuser_id,  -- 按用户分组,每个用户单独计算会话SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,COUNT(*) AS total_behavior
    FROM kafka_user_behavior
    GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);
    
    • 同一用户如果 30 分钟内有新行为,窗口会持续延长;超过 30 分钟无行为,窗口关闭并输出结果。
窗口的 “迟到数据” 处理

流数据可能因网络延迟等原因 “迟到”(数据到达时所属窗口已关闭)。Flink SQL 通过 水印(Watermark) 控制窗口关闭时机,可配置 “允许迟到时间”:

-- 在创建表时定义水印,允许数据迟到10秒(窗口关闭后10秒内的迟到数据仍会被处理)
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 基础水印(提前5秒关闭窗口)
-- 结合窗口时,实际允许迟到时间 = 水印延迟 + 窗口长度(视具体配置)

  • 超过允许时间的迟到数据默认被丢弃,可通过 ALLOWLATESTDATA() 函数捕获(需结合状态后端配置)。

二、流表关联(JOIN)

Flink SQL 支持多表关联(类似传统 SQL 的 JOIN),但流表关联需考虑 “无界数据” 的特性(数据持续到来,无法一次性关联所有数据),因此有特殊的关联逻辑。

常用的流表 JOIN 类型:

1. 常规关联(Regular Join)
  • 特点:类似批处理 JOIN,关联所有历史数据和新数据,但会存储所有表的历史数据(状态),可能导致状态无限增长。
  • 适用场景:小表关联(如字典表,数据量小且更新少)。
  • 语法:支持 INNER JOINLEFT JOINRIGHT JOIN,需指定关联条件。
  • 示例:用户行为表关联用户信息表(小表)
    -- 假设已创建用户信息表(静态小表,如 MySQL 中的用户表)
    CREATE TABLE user_info (user_id BIGINT PRIMARY KEY NOT ENFORCED,user_name VARCHAR,age INT
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test','table-name' = 'user_info'
    );-- 关联用户行为和用户信息
    SELECT b.user_id, i.user_name, b.behavior 
    FROM kafka_user_behavior b
    INNER JOIN user_info i 
    ON b.user_id = i.user_id;  -- 关联条件:用户ID
    
    • 注意:user_info 是小表时,Flink 会将其加载到内存;如果是大表,会持续存储状态,需谨慎使用。
2. 时态关联(Temporal Join)
  • 特点:关联 “某个时间点” 的数据版本(解决 “维度表更新” 问题),例如 “订单发生时的商品价格”(商品价格可能随时间变化)。
  • 适用场景:动态维度表关联(如商品表、用户等级表,数据会更新)。
  • 语法:需指定 “版本时间字段” 和 “关联时间”。
  • 示例:订单流关联商品表的 “当时价格”
    -- 商品表(含价格和更新时间,动态更新)
    CREATE TABLE products (product_id BIGINT PRIMARY KEY NOT ENFORCED,price DECIMAL(10,2),update_time TIMESTAMP(3)  -- 价格更新时间(版本时间)
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test','table-name' = 'products'
    );-- 订单流(含下单时间)
    CREATE TABLE orders (order_id BIGINT,product_id BIGINT,order_time TIMESTAMP(3),  -- 下单时间WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    ) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
    );-- 关联订单发生时的商品价格(时态关联)
    SELECT o.order_id,o.product_id,p.price AS order_price,  -- 下单时的价格o.order_time
    FROM orders o
    LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_time p  -- 关联订单时间点的商品版本
    ON o.product_id = p.product_id;
    
    • FOR SYSTEM_TIME AS OF o.order_time 表示:取 orders 表中 order_time 时刻的 products 表数据版本。
3. 间隔关联(Interval Join)
  • 特点:关联两个流中 “时间在一定范围内” 的数据(解决 “流 - 流关联” 的时间对齐问题)。
  • 适用场景:两个流的数据有时间关联性(如 “用户点击后 30 分钟内的购买行为”)。
  • 语法:通过 BETWEEN 定义时间范围。
  • 示例:关联点击行为后 30 分钟内的购买行为
    -- 点击流(筛选出点击行为)
    CREATE VIEW clicks AS
    SELECT user_id, item_id, event_time AS click_time
    FROM kafka_user_behavior
    WHERE behavior = 'click';-- 购买流(筛选出购买行为)
    CREATE VIEW buys AS
    SELECT user_id, item_id, event_time AS buy_time
    FROM kafka_user_behavior
    WHERE behavior = 'buy';-- 关联同一用户、同一商品,且购买在点击后30分钟内的数据
    SELECTc.user_id,c.item_id,c.click_time,b.buy_time
    FROM clicks c
    INNER JOIN buys b
    ON c.user_id = b.user_id AND c.item_id = b.item_idAND b.buy_time BETWEEN c.click_time AND c.click_time + INTERVAL '30' MINUTE;  -- 时间范围
    
    • 仅关联满足 “用户、商品相同” 且 “购买在点击后 30 分钟内” 的数据,避免无限制存储状态。

三、函数扩展(内置函数 + 自定义函数)

Flink SQL 提供丰富的函数处理数据,除基础函数(COUNTSUBSTRING 等)外,还有流处理特有的函数,也支持自定义函数扩展。

1. 常用内置函数(进阶)
  • 时间函数(流处理核心):

    • TO_TIMESTAMP_LTZ(ts, 3):将毫秒时间戳(BIGINT)转换为带时区的事件时间(TIMESTAMP(3))。
    • PROCTIME():获取数据被 Flink 处理的时间(处理时间),需在表定义中声明:
      CREATE TABLE t (id INT,pt AS PROCTIME()  -- 定义处理时间字段 pt
      ) WITH (...);
      
    • CURRENT_WATERMARK():获取当前水印时间(用于判断数据是否迟到)。
  • 聚合函数

    • SUM()AVG():常规聚合,流处理中会持续更新结果。
    • COLLECT_LIST(col):收集某字段的所有值为数组(适用于会话分析)。
2. 自定义函数(UDF/UDAF/UDTF)

当内置函数满足不了需求时,可自定义函数(需用 Java/Scala 开发,或通过 Python 调用)。

示例:自定义 UDF(字符串反转)
  1. 用 Java 实现 UDF(需继承 ScalarFunction):
    import org.apache.flink.table.functions.ScalarFunction;public class ReverseString extends ScalarFunction {// 函数逻辑:反转字符串public String eval(String s) {if (s == null) return null;return new StringBuilder(s).reverse().toString();}
    }
    
  2. 打包成 JAR,上传到 Flink 集群,在 SQL Client 中注册:
    -- 注册 UDF
    CREATE TEMPORARY FUNCTION reverse_str AS 'com.example.ReverseString' USING JAR 'file:///path/to/your.jar';
    
  3. 使用自定义函数:
    SELECT user_id, reverse_str(behavior) AS reversed_behavior 
    FROM user_behavior;
    

四、CDC 数据同步(变更数据捕获)

CDC(Change Data Capture)用于捕获数据库的变更(INSERT/UPDATE/DELETE),Flink SQL 可通过 CDC 连接器将数据库变更同步到其他系统(如 Kafka、数据仓库),实现实时数据同步。

以 MySQL CDC 为例(需先开启 MySQL 的 binlog):

1. 准备工作
  • 开启 MySQL binlog(在 my.cnf 中配置):
    server-id = 1
    log_bin = /var/log/mysql/mysql-bin.log
    binlog_format = ROW  # 必须为 ROW 格式
    
  • 下载 Flink CDC 连接器 JAR(如 flink-connector-mysql-cdc-2.4.1.jar),放入 Flink 的 lib 目录,重启集群。
2. 创建 MySQL CDC 源表
CREATE TABLE mysql_products_cdc (product_id BIGINT PRIMARY KEY NOT ENFORCED,product_name VARCHAR,price DECIMAL(10,2),update_time TIMESTAMP(3)
) WITH ('connector' = 'mysql-cdc',  -- CDC 连接器'hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'test',  -- 数据库名'table-name' = 'products',  -- 表名'scan.startup.mode' = 'initial'  -- 初始同步全量数据,之后同步增量变更
);

  • 该表会实时捕获 products 表的所有变更(新增、更新、删除)。
3. 处理 CDC 变更数据

CDC 表会包含一个隐藏字段 __db_operation(表示操作类型:+I 插入、-U 旧值、+U 新值、-D 删除),可用于区分变更类型:

-- 筛选出所有更新操作的新值
SELECT product_id, product_name, price, '__db_operation' AS op
FROM mysql_products_cdc
WHERE '__db_operation' = '+U';
4. 同步变更到 Kafka
-- 创建 Kafka 汇表(存储变更数据)
CREATE TABLE kafka_products_sink (product_id BIGINT,product_name VARCHAR,price DECIMAL(10,2),op VARCHAR  -- 存储操作类型
) WITH ('connector' = 'kafka','topic' = 'products_changes','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 同步 CDC 变更到 Kafka
INSERT INTO kafka_products_sink
SELECT product_id, product_name, price, '__db_operation' AS op
FROM mysql_products_cdc;

第二次总结

本次讲解了 Flink SQL 的进阶功能:

  1. 窗口计算:滚动、滑动、会话窗口的使用,解决流数据分段统计问题;
  2. 流表关联:常规关联、时态关联、间隔关联,适配不同场景的多表联合分析;
  3. 函数扩展:内置时间函数及自定义函数的开发,满足复杂数据转换需求;
  4. CDC 同步:通过 MySQL CDC 捕获数据库变更,实现实时数据同步。

这些功能覆盖了流批统一处理的核心场景,结合基础内容,可应对大部分实时数据处理需求(如实时报表、实时风控、数据同步等)。实际使用时需注意状态管理(避免状态过大)和性能调优(如窗口并行度、水印延迟配置)。

http://www.xdnf.cn/news/1279927.html

相关文章:

  • IDE认知革命:JetBrains AI Assistant插件深度调教手册(终极实战指南)
  • 服务器配置实战:从 “密码锁” 到 “分工协作” 的知识点详解
  • POI导入时相关的EXCEL校验
  • Spring Boot Excel数据导入数据库实现详解
  • 缓存的三大问题分析与解决
  • Flink + Hologres构建实时数仓
  • MSE ZooKeeper:Flink高可用架构的企业级选择
  • 容器之王--Docker的安全优化详解及演练
  • 在Mac 上生成GitLab 的SSH 密钥并将其添加到GitLab
  • Django Request 与 DRF Request 的区别
  • (Arxiv-2025)Phantom:通过跨模态对齐实现主体一致性视频生成
  • 什么情况下会导致日本服务器变慢?解决办法
  • 第2节 大模型分布式推理架构设计原则
  • AIStarter修复macOS 15兼容问题:跨平台AI项目管理新体验
  • MySQL权限管理和MySQL备份
  • 大模型落地实践:从技术重构到行业变革的双重突破
  • C/C++练习面试题
  • Selenium动态元素定位
  • 【运维进阶】WEB 服务器
  • 学习观察和行动:机器人操作中任务-觉察的视图规划
  • docker安装searxng
  • C语言如何安全的进行字符串拷贝
  • 云原生环境 Prometheus 企业级监控实战
  • Centos 用http ftp搭建本地yum源 保姆级教程
  • QML开发:动画元素
  • 企业高性能web服务器Nginx的详细部署(实战篇)
  • [4.2-2] NCCL新版本的register如何实现的?
  • ResponseBodyAdvice是什么?
  • ChatML vs Harmony:深度解析OpenAI全新对话结构格式的变化
  • ARM基础概念 day51