当前位置: 首页 > news >正文

Flink实时流量统计:基于窗口函数与Redis Sink的每小时PV监控系统(学习记录)

题目:

利用flink统计网站浏览量,并写入redis。

利用窗口函数以及算子实现每小时PV(网站的页面浏览量)统计,对统计后结果数据格式进行设计,存储至Redis中(利用sink将处理后结果数据输出到redis数据库中)。

操作步骤:

1.redis在虚拟机上的安装

(1)下载redis安装包

cd /opt/software # 进入一个用于存放安装包的目录,可自行选择

wget http://download.redis.io/releases/redis-6.2.6.tar.gz

# 下载Redis 6.2.6版本,可根据需求更换版本号

(2)解压安装包——使用tar命令解压下载好的安装包

tar -zxvf redis-6.2.6.tar.gz

解压后会生成一个名为redis-6.2.6的目录。

(3)安装gcc编译工具

yum install -y gcc gcc-c++ make

(4)编译和安装 Redis

cd redis-6.2.6

make # 编译Redis,此过程可能需要一些时间,取决于虚拟机性能

make install # 安装Redis,默认会安装到/usr/local/bin目录下

(5)配置redis—— Redis 默认没有生成配置文件,需要手动创建相关目录和文件:

mkdir /etc/redis # 创建存放配置文件的目录

cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 将示例配置文件复制到新目录下,路径根据实际解压位置调整

如图所示,redis安装成功并且进行了正常访问。

此处还需要关闭虚拟机上的防火墙,使redis的6379端口可以被正常访问。

2.代码展示

项目创建

在 IntelliJ IDEA 中,选择File -> New -> Project,然后选择Maven,按照向导创建一个新的 Maven 项目。在pom.xml文件中添加以下依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-pv-redis</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><redis.clients.version>3.8.0</redis.clients.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Flink 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><!-- Flink Redis 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- Redis 客户端依赖 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.clients.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><!-- 阿里云镜像 --><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin></plugins></build>
</project>

定义数据类型

创建一个 Java 类来表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五个字段,以逗号分隔。

package Bean;public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;  // 行为类型:"pv"为页面浏览private Long timestamp;   // 时间戳(毫秒)public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public Long getItemId() {return itemId;}public void setItemId(Long itemId) {this.itemId = itemId;}public Integer getCategoryId() {return categoryId;}public void setCategoryId(Integer categoryId) {this.categoryId = categoryId;}public String getBehavior() {return behavior;}public void setBehavior(String behavior) {this.behavior = behavior;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}}

编写Flink程序

创建一个主类,比如PvStatisticsToRedis.java,编写 Flink 程序来统计每小时 PV 并写入 Redis。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;public class PvStatisticsToRedis {public static void main(String[] args) throws Exception {// 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 CSV 文件读取数据,假设第一行是表头,这里简单跳过DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv").skip(1).map(new MapFunction<String, UserBehavior>() {@Overridepublic UserBehavior map(String line) throws Exception {String[] parts = line.split(",");long userId = Long.parseLong(parts[0]);String behavior = parts[1];return new UserBehavior(userId, behavior);}});// 过滤出 pv 行为的数据SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream.filter(behavior -> "pv".equals(behavior.getBehavior())).map(behavior -> Tuple2.of("pv", behavior.getUserId())).returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照 "pv" 进行分组,并使用滑动窗口统计每小时的 PV 数量KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream.window(TumblingProcessingTimeWindows.of(Time.hours(1))).process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {private transient HashSet<Long> userIds;@Overridepublic void open(Configuration parameters) throws Exception {userIds = new HashSet<>();}@Overridepublic void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {userIds.add(value.f1);Map<String, Object> result = new HashMap<>();result.put("window_end_time", ctx.timerService().currentProcessingTime());result.put("pv_count", userIds.size());out.collect(result);}@Overridepublic void close() throws Exception {userIds.clear();}});// 将统计结果转换为字符串格式SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {@Overridepublic String map(Map<String, Object> value) throws Exception {return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");}});// 配置 Redis 连接FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 创建 Redis SinkRedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");}@Overridepublic String getKeyFromData(String data) {return null;}@Overridepublic String getValueFromData(String data) {return data;}});RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);// 将结果写入 RedisresultStream.addSink(redisSink);// 打印结果到控制台(可选)resultStream.addSink(new PrintSinkFunction<>());// 执行 Flink 任务env.execute("PV Statistics to Redis");}
}

资源文件中加载日志log4j.poverities,代码如下:

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

运行效果:

3.sink输出到redis数据库

核心原理

Flink 的RedisSink通过自定义RedisMapper实现数据写入 Redis,主要需指定:

Redis 命令:如SET(存储键值对)、HSET(存储哈希)、LPUSH(存储列表)等。

键(Key):用于标识数据的唯一性(如按小时的 PV 统计可将window_end_time作为 Key)。

值(Value):需要存储的具体统计结果(如 PV 数量)。

具体实现步骤

假设统计结果为每小时的 PV 数,设计存储格式如下:

Redis 键(Key):pv_statistics:{window_end_time}(其中window_end_time为窗口结束时间戳,精确到小时)。

Redis 值(Value):该小时的 PV 总数(如12345)。

数据结构:采用String类型(通过SET命令存储),便于后续查询和聚合。

RedisMapper是 Flink 与 Redis 交互的核心接口,需实现 3 个方法:

getCommandDescription():指定 Redis 命令(如SET)。

getKeyFromData():从统计结果中提取 Redis 的 Key。

getValueFromData():从统计结果中提取 Redis 的 Value。

代码实现

RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");}@Overridepublic String getKeyFromData(PvResult data) {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");String key = sdf.format(new Date(data.getWindowStart()));System.out.println("Redis Key: " + key);return key;}@Overridepublic String getValueFromData(PvResult data) {return String.valueOf(data.getCount());}
};
package Bean;
public class PvResult {private long windowStart; // 窗口开始时间(毫秒)private long windowEnd;   // 窗口结束时间(毫秒)private long count;       // 该窗口的 PV 数public PvResult() {}public PvResult(long windowStart, long windowEnd, long count) {this.windowStart = windowStart;this.windowEnd = windowEnd;this.count = count;}// Getters & Setterspublic long getWindowStart() { return windowStart; }public void setWindowStart(long windowStart) { this.windowStart = windowStart; }public long getWindowEnd() { return windowEnd; }public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }public long getCount() { return count; }public void setCount(long count) { this.count = count; }
}

配置 Redis 连接

通过FlinkJedisPoolConfig配置 Redis 连接信息(如主机、端口、密码等):

FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.20") // 虚拟机Redis IP.setPort(6379).build();

注意,这里连接的是虚拟机上的redis,需要更改为虚拟机上的IP地址。

效果展示:

http://www.xdnf.cn/news/1147627.html

相关文章:

  • rust实现的快捷补全到剪贴板的实用工具
  • Zara和网易云音乐仿写总结
  • 【c++】提升用户体验:问答系统的交互优化实践——关于我用AI编写了一个聊天机器人……(12)
  • 使用 Gunicorn 部署 Django 项目
  • AI编程工具对比:Cursor、GitHub Copilot与Claude Code
  • Oracle Database 23ai 技术细节与医疗 AI 应用
  • Lock4j 使用说明
  • 【Linux服务器】-mysql数据库数据目录迁移
  • 安全事件响应分析--基础命令
  • 【机器学习深度学习】为什么要将模型转换为 GGUF 格式?
  • [MarkdownGithub] 使用块引用高亮显示“注意“和“警告“和其他注意方式的选项
  • 删除debian xdm自启动ibus的配置项
  • Private Equity(PE)Investment Banking(IB)
  • 拉普拉斯方程极坐标解法
  • 万字解析LVS集群
  • CAN通信驱动开发注意事项
  • Django母婴商城项目实践(六)- Models模型之ORM操作
  • undefined reference to ‘end‘
  • webstorm的内置命令行工具没办法使用了怎么办
  • CSS-in-JSVue的解决方案
  • 深入理解DNS原理与服务的详细配置
  • 传统行业和AIGC的结合及应用
  • 计算机视觉:AI 的 “眼睛” 如何看懂世界?
  • 让 Windows 用上 macOS 的系统下载与保姆级使用教程
  • Spring Cloud Gateway与Envoy Sidecar在微服务请求路由中的架构设计分享
  • 云服务器磁盘IO性能优化的测试与配置方法
  • 大模型 Function Call 的实现步骤及示例详解
  • 6 STM32单片机的智能家居安防系统设计(STM32代码+手机APP设计+PCB设计+Proteus仿真)
  • 【Qt开发】Qt的背景介绍(三)-> 认识Qt Creator
  • 【C# in .NET】20. 探秘静态类:抽象与密封的结合体