当前位置: 首页 > ops >正文

Flink Sql 按分钟或日期统计数据量

一、环境版本

环境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 没有 Trigger 功能,长时间的窗口不能在中途触发计算,输出中间结果。比如每 10S 更新一次截止到当前的pv、uv。只能用Trigger配合State实现,可参考如下实现方式:
Flink DataStream 按分钟或日期统计数据量

二、MySQL建表脚本

create table user_log
(id      int auto_increment comment '主键'primary key,uid     int    not null comment '用户id',event   int    not null comment '用户行为',logtime bigint null comment '日志时间'
)comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/*** 用户日志类*/
@Data
public class UserLog {//用户uidprivate int uid;//用户行为private int event;//日志时间private Date logtime;//获取日期,用于按日期统计数据public String getFormatDate() {return DateUtil.format(logtime, "yyyyMMdd");}//获取时间,精确到分钟public String getFormatTime() {return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";}
}
}

四、用户数据生成器

/*** 用户数据生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定义数据生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 实现类new GeneratorFunction<Long, UserLog>(){// 定义随机数数据生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//随机生成用户uiduserLog.setUid(generator.nextInt(1, 50));//随机生成用户行为userLog.setEvent(generator.nextInt(1, 2));//随机生成用户数据时间userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定输出数据的总行数
//                60 * 60 * 10,1200,// 指定每秒发射的记录数RateLimiterStrategy.perSecond(10),// 指定返回值类型, 将Java的StockPrice封装成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//输出生成数据
//        dataGeneratorSourceStream.print();//kafka数据写入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL数据写入,用以数据验证SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.31.116:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、Sql按分钟或日期统计PV和UV

public class UserLogSql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);// 创建一个输入表SourceTableString sourceDDL = "create table user_log\n" +"(\n" +"    uid  INT\n" +"    , event INT\n" +"    , logtime BIGINT\n" +"    , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +"    , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +") with (\n" +"      'connector' = 'kafka'\n" +"      ,'topic' = 'userLog'\n" +"      ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +"      ,'scan.startup.mode' = 'latest-offset'\n" +"      ,'format' = 'json'\n" +");";tableEnv.executeSql(sourceDDL);// 统计每分钟PV和UVString result = "select\n" +" date_format(window_start, 'yyyy-MM-dd') cal_day\n" +" , date_format(window_start, 'HH:mm:ss') start_time\n" +" , date_format(window_end, 'HH:mm:ss') end_time\n" +" , count(uid) pv\n" +" , count(distinct uid) uv\n" +"FROM TABLE(\n" +// 每隔10秒触发一次计算,窗口大小为1天
//                "    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +// 每隔10秒触发一次计算,窗口大小为10秒"    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +"  GROUP BY window_start, window_end\n" +";";// 输出sql执行结果tableEnv.executeSql(result).print();}
}

六、sql-client方式执行Sql

# 建表语句
create table user_log
(uid  INT,event INT,logtime BIGINT,rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with ('connector' = 'kafka','topic' = 'userLog''properties.bootstrap.servers' = 'hadoop01:9092','scan.startup.mode' = 'latest-offset','format' = 'json',
);# pv、uv计算语句, 每隔10秒触发一次计算,窗口大小为1天
selectdate_format(window_start, 'yyyy-MM-dd') cal_day,date_format(window_start, 'HH:mm:ss') start_time,date_format(window_end, 'HH:mm:ss') end_time,count(uid) pv,count(distinct uid) uv
FROM TABLE(CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))GROUP BY window_start, window_end;

七、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogSql或在sql-client执行Sql
  3. 在MySQL中验证查询

转换时间戳

时间戳转换前转换后
w_start2025-08-16 14:45:401755326740000
w_end2025-08-16 14:45:501755326750000
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 与MySql中输出一致SQL Query Result (Table)                                                               Refresh: 1 s                                                      Page: Last of 1                                              Updated: 23:50:09.972 cal_day                     start_time                       end_time                   pv                   uv2025-08-15                       23:45:30                       23:45:40                   15                   152025-08-15                       23:45:40                       23:45:50                  101                   452025-08-15                       23:45:50                       23:46:00                  104                   422025-08-15                       23:46:00                       23:46:10                  100                   422025-08-15                       23:46:10                       23:46:20                   97                   452025-08-15                       23:46:20                       23:46:30                  104                   402025-08-15                       23:46:30                       23:46:40                   97                   422025-08-15                       23:46:40                       23:46:50                   99                   442025-08-15                       23:46:50                       23:47:00                  103                   442025-08-15                       23:47:00                       23:47:10                   97                   442025-08-15                       23:47:10                       23:47:20                  100                   43

八、常见问题

  1. sql-client执行查询,缺少kafka包
# 运行SQL命令
Flink SQL> select * from user_log;
# 报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

解决方法

# 下载flink对应版本的kafka包,放到flink的lib目录下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/

九、参考鸣谢

Flink 实时统计历史 pv、uv
Flink Cumulate Window

http://www.xdnf.cn/news/17913.html

相关文章:

  • 从 “视频孪生” 到 “视频动态目标三维重构”:技术演进与核心突破
  • PHP域名授权系统网站源码_授权管理工单系统_精美UI_附教程
  • 基于W55MH32Q-EVB 实现 HTTP 服务器配置 OLED 滚动显示信息
  • 企业级Java项目金融应用领域——银行系统
  • 【P40 6-3】OpenCV Python——图像融合(两张相同属性的图片按比例叠加),addWeighted()
  • B3924 [GESP202312 二级] 小杨的H字矩阵
  • Java后台生成多个Excel并用Zip打包下载
  • 《Python学习之字典(一):基础操作与核心用法》
  • 基于 EC 数据与大模型技术实现天气预报:从数据到上线的全栈方法
  • 学习嵌入式第三十天
  • C++进阶:IO流
  • 【Vibe Coding 工程之 StockAnalyzerPro 记录】- EP3.Phase 2股票列表管理功能
  • JCTools 无锁并发队列基础:ConcurrentCircularArrayQueue
  • TDengine IDMP 高级功能(4. 元素引用)
  • C# 反射和特性(关于应用特性的更多内容)
  • 解锁JavaScript性能优化:从理论到实战
  • C#WPF实战出真汁09--【消费开单】--选择菜品
  • 一次性能排查引发的Spring MVC深度思考
  • Element Plus 中 el-input 限制为数值输入的方法
  • Docker自定义镜像
  • 自动驾驶中的传感器技术24.1——Camera(16)
  • 算法训练营day53 图论④ 110.字符串接龙、105.有向图的完全可达性、106.岛屿的周长
  • Conda创建py3.10环境(股票),并且安装程序包的命令
  • 元宇宙教育:打破时空限制的学习革命
  • 汽车大灯ABD算法介绍
  • SpringAI中的模块化链式Advisor调用(源码学习)
  • B3865 [GESP202309 二级] 小杨的 X 字矩阵(举一反三)
  • Linux 多线程:线程回收策略 线程间通信(互斥锁详解)
  • linux下程序运行一段时间无端崩溃/被杀死,或者内存占用一直增大。linux的坑
  • Docker in Test:用一次性的真实环境,终结“测试永远跑不通”魔咒