当前位置: 首页 > web >正文

FlinkSql(详细讲解一)

Flink SQL 是 Flink 提供的基于 SQL 语法的流批统一处理框架,它屏蔽了底层 Flink 算子的复杂性,让用户可以用熟悉的 SQL 语句处理无界流数据和有界批数据。第一次讲解会聚焦基础概念、核心组件、环境搭建、数据类型、表的创建与管理、基础查询等内容,确保易懂且实用。

一、Flink SQL 核心概念

  1. 流批统一:Flink SQL 不区分 “流处理” 和 “批处理”,而是将批数据视为 “有界流”(数据处理完就结束),流数据视为 “无界流”(持续处理新数据),用同一套 SQL 语法处理两种场景。
  2. Table & View
    • Table(表):是 Flink SQL 中数据的核心载体,可对应外部存储(如 Kafka、MySQL、文件)或中间计算结果。表有 “源表(Source,读数据)” 和 “汇表(Sink,写数据)” 之分。
    • View(视图):是基于表或其他视图的逻辑查询,不存储实际数据,仅保存查询逻辑,用于简化复杂 SQL。
  3. Catalog(元数据目录):管理表、视图、函数等元数据的组件,默认使用内存 Catalog(重启后丢失),也支持 Hive Catalog(持久化元数据)。
  4. Connector(连接器):Flink SQL 通过连接器对接外部系统(如 Kafka、MySQL、HDFS),负责数据的读取(Source Connector)和写入(Sink Connector)。
  5. Function(函数):包括内置函数(如 COUNT()SUBSTRING())和自定义函数(UDF/UDAF/UDTF),用于数据转换和计算。

二、环境搭建:如何运行 Flink SQL?

Flink SQL 有两种常用运行方式:SQL Client(命令行工具) 和 代码集成(Java/Scala/Python)。这里重点讲 SQL Client(最直观,适合入门)。

1. 安装 Flink 并启动 SQL Client
  • 下载 Flink 安装包(推荐 1.17+ 版本,流批统一支持更完善):Flink 官网
  • 解压后启动集群(本地模式,仅用于测试):
    # 进入解压目录
    cd flink-1.17.0
    # 启动本地集群(包含 1 个 JobManager + 1 个 TaskManager)
    ./bin/start-cluster.sh
    
  • 启动 SQL Client:
    ./bin/sql-client.sh embedded  # embedded 表示嵌入集群模式(本地运行)
    

    启动成功后会进入 Flink SQL> 交互界面,可直接输入 SQL 语句执行。

三、Flink SQL 数据类型

Flink SQL 支持丰富的数据类型,和 SQL 标准类似,但有部分特殊类型(尤其是时间类型,对处理流数据很重要)。

1. 基本类型(常用)
类型名说明示例值
INT32 位整数123
BIGINT64 位整数12345678901
VARCHAR(n)可变长度字符串(n 为最大长度,可省略)'flink'
DOUBLE双精度浮点数3.14
BOOLEAN布尔值TRUE / FALSE
2. 复杂类型(处理嵌套数据)
类型名说明示例定义
ARRAY<T>元素类型为 T 的数组ARRAY[1,2,3](INT 数组)
MAP<K, V>键类型 K、值类型 V 的映射MAP['a' -> 1, 'b' -> 2]
ROW<f1 T1, f2 T2>结构化数据(类似 “对象”)ROW(1, 'flink')(INT + VARCHAR)
3. 时间类型(流处理核心)

流数据需要处理 “时间”(如窗口计算、延迟数据处理),Flink SQL 支持两种时间:

  • 事件时间(Event Time):数据产生的实际时间(如日志中的 ts 字段),需通过 “水印(Watermark)” 定义(见下文表创建)。
  • 处理时间(Processing Time):数据进入 Flink 被处理的时间(无需定义,直接用 PROCTIME() 函数获取)。

时间类型语法:

类型名说明示例
TIMESTAMP(3)带毫秒精度的时间(默认 3 位)'2023-10-01 12:00:00.123'
DATE日期(年 - 月 - 日)'2023-10-01'
TIME时间(时:分: 秒)'12:00:00'

四、创建表(核心操作)

表是 Flink SQL 处理数据的入口,创建表需指定:字段名、字段类型、连接器(对接外部系统)、格式(数据序列化方式),如果是流表还需定义水印(处理事件时间)。

语法模板
CREATE TABLE [IF NOT EXISTS] 表名 (字段名1 类型1,字段名2 类型2,-- (可选)定义事件时间字段和水印(流表必用)WATERMARK FOR 事件时间字段 AS 水印规则  -- 水印规则:通常是事件时间减延迟(如 5 秒)
) WITH (-- 连接器类型(如 'kafka'、'filesystem'、'mysql')'connector' = '连接器名称',-- 连接器特有配置(如 Kafka 的 bootstrap.servers、topic)'连接器参数1' = '值1','连接器参数2' = '值2',-- 数据格式(如 'csv'、'json'、'avro')'format' = '格式名称',-- 格式特有配置(如 CSV 的字段分隔符)'格式参数1' = '值1'
);
实战示例:创建源表和汇表
示例 1:创建一个读取 CSV 文件的源表(批处理场景)

假设本地有一个 user_behavior.csv 文件,内容如下(用户行为数据)

1001,2001,300,click,2023-10-01 10:00:00.000
1002,2002,300,buy,2023-10-01 10:00:05.000

创建表读取该文件:

CREATE TABLE user_behavior (user_id BIGINT,       -- 用户IDitem_id BIGINT,       -- 商品IDcategory_id INT,      -- 商品分类IDbehavior VARCHAR,     -- 行为(click/buy)ts TIMESTAMP(3)       -- 事件时间(毫秒精度)
) WITH ('connector' = 'filesystem',  -- 连接器:文件系统'path' = 'file:///本地路径/user_behavior.csv',  -- 文件路径(本地用 file:///,HDFS 用 hdfs:///)'format' = 'csv',            -- 格式:CSV'csv.field-delimiter' = ','  -- CSV 字段分隔符(默认逗号,可省略)
);
示例 2:创建一个 Kafka 源表(流处理场景,带水印)

Kafka 中存储用户行为的 JSON 数据(格式:{"user_id":1001, "item_id":2001, "ts":1696120800000}),创建表读取并定义水印:

CREATE TABLE kafka_user_behavior (user_id BIGINT,item_id BIGINT,ts BIGINT,  -- Kafka 中 ts 是毫秒时间戳(Long 类型)-- 将 ts 转换为事件时间(TIMESTAMP(3)),并定义水印:允许数据延迟 5 秒event_time AS TO_TIMESTAMP_LTZ(ts, 3),  -- 转换为带时区的时间戳WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 水印规则
) WITH ('connector' = 'kafka','topic' = 'user_behavior_topic',  -- Kafka 主题'properties.bootstrap.servers' = 'localhost:9092',  -- Kafka 地址'properties.group.id' = 'flink_sql_group',  -- 消费者组'scan.startup.mode' = 'earliest-offset',  -- 从最早位置读取'format' = 'json',  -- 数据格式:JSON'json.fail-on-missing-field' = 'false',  -- 字段缺失不报错'json.ignore-parse-errors' = 'true'  -- 解析错误忽略
);

  • 水印(WATERMARK)作用:流数据可能乱序,水印用于标记 “当前已处理到的时间”,超过水印时间的数据会被视为 “迟到数据”(可配置处理策略)。
示例 3:创建一个 MySQL 汇表(写入结果)

将计算结果写入 MySQL 的 user_behavior_result 表:

CREATE TABLE mysql_sink (user_id BIGINT,behavior_count INT,PRIMARY KEY (user_id) NOT ENFORCED  -- MySQL 主键(Flink 不强制校验)
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test',  -- MySQL 地址'table-name' = 'user_behavior_result',  -- 目标表名'username' = 'root',  -- 用户名'password' = '123456',  -- 密码'sink.buffer-flush.max-rows' = '100',  -- 累计 100 行批量写入'sink.buffer-flush.interval' = '10s'  -- 10 秒批量写入(取两者先满足的)
);

五、基础查询(SELECT 语句)

Flink SQL 的查询语法和标准 SQL 几乎一致,支持投影、过滤、聚合等操作,流批场景通用。

1. 简单投影与过滤

查询 user_behavior 表中 “购买(buy)” 行为的用户 ID 和商品 ID:

SELECT user_id, item_id 
FROM user_behavior 
WHERE behavior = 'buy';

  • 批处理场景:执行后直接返回文件中所有符合条件的结果。
  • 流处理场景(如查询 kafka_user_behavior):会持续监听 Kafka 新数据,每来一条符合条件的数据就输出一次。
2. 基本聚合(GROUP BY)

统计每个用户的行为总数(批处理):

SELECT user_id, COUNT(*) AS behavior_count 
FROM user_behavior 
GROUP BY user_id;

  • 流处理中的聚合:默认是 “无界聚合”(持续更新结果),例如 Kafka 中每新增一条数据,都会重新计算该用户的总数并输出。
3. 写入结果到汇表

将上述聚合结果写入 MySQL 汇表 mysql_sink

INSERT INTO mysql_sink
SELECT user_id, COUNT(*) AS behavior_count 
FROM user_behavior 
GROUP BY user_id;

  • 批处理:执行后一次性写入所有结果,任务结束。
  • 流处理:持续计算并写入最新结果(MySQL 会根据主键更新数据)。

六、视图操作

视图用于简化复杂查询,例如频繁查询 “购买行为且用户 ID> 1000” 的数据,可创建视图:

-- 创建视图
CREATE VIEW buy_behavior_view AS
SELECT user_id, item_id, ts 
FROM user_behavior 
WHERE behavior = 'buy' AND user_id > 1000;-- 查询视图(和查表格一样)
SELECT * FROM buy_behavior_view;-- 删除视图
DROP VIEW IF EXISTS buy_behavior_view;

七、表管理命令

命令作用示例
SHOW TABLES;查看当前 Catalog 中的所有表SHOW TABLES;
DESCRIBE [表名];查看表的结构(字段、类型、水印等)DESCRIBE user_behavior;
DROP TABLE [IF EXISTS] 表名;删除表(仅删除元数据,不删除外部数据)DROP TABLE IF EXISTS user_behavior;

八、执行模式切换(流 / 批)

Flink SQL 默认是 “流处理模式”,可通过配置切换为 “批处理模式”:

-- 切换为批处理模式
SET execution.runtime-mode = 'batch';-- 切换回流处理模式
SET execution.runtime-mode = 'streaming';
  • 批处理模式:适合处理有界数据(如文件),任务执行完自动结束。
  • 流处理模式:适合处理无界数据(如 Kafka),任务持续运行,直到手动停止。

第一次总结

本次讲解了 Flink SQL 的基础概念(表、视图、连接器等)、环境搭建、数据类型、表的创建(源表 / 汇表 / 水印)、基础查询(SELECT/INSERT)、视图操作和表管理命令。这些是使用 Flink SQL 的核心基础,掌握后可处理简单的流批数据场景。

下一次会深入讲解复杂操作:窗口计算(TUMBLE/HOP/SESSION)、流表 JOIN、函数(内置 / 自定义)、CDC 同步(变更数据捕获)等进阶内容。

http://www.xdnf.cn/news/17699.html

相关文章:

  • Dify入门指南(2):5 分钟部署 Dify:云服务 vs 本地 Docker
  • Speech Databases of Typical Children and Children with SLI 数据集解读
  • Vue 中的 Class 与 Style 绑定详解1
  • 数据类型 string
  • MCU中的存储器映射(Memory Map)
  • 【CF】Day125——图论三题
  • 训推一体 | 暴雨X8848 G6服务器 x Intel®Gaudi® 2E AI加速卡
  • C语言变量的声明和定义有什么区别?
  • 图生视频实战:用[灵龙AI API]玩转AI生成视频 – 第2篇,从静图到大片
  • 关于linux系统编程2——IO编程
  • 【Docker实战进阶】Docker 实战命令大全
  • AI基础与实践专题:PyTorch实现线性回归
  • 【unity实战】在Unity中实现不规则模型的网格建造系统(附项目源码)
  • 【实用案例】录音分片上传的核心逻辑和实现案例【文章附有代码】
  • Godot ------ 平滑拖动03
  • SpringBoot 自动配置核心机制(面试高频考点)
  • Orange的运维学习日记--38.MariaDB详解与服务部署
  • JavaEE 初阶第十七期:文件 IO 的 “管道艺术”(下)
  • 《范仲淹传》读书笔记与摘要
  • 使用frp内网穿透实现远程办公
  • 基于AI量化模型的比特币周期重构:传统四年规律是否被算法因子打破?
  • Python(9)-- 异常模块与包
  • AI Coding 概述及学习路线图
  • Elasticsearch Node.js 客户端的安装
  • 【功能测试】软件集成测试思路策略与经验总结
  • FFmpeg - 基本 API大全(视频编解码相关的)
  • 【数据结构】深入理解顺序表与通讯录项目的实现
  • leetcode-hot-100 (图论)
  • CobaltStrike的搭建和使用
  • 爬虫与数据分析实战