当前位置: 首页 > news >正文

Flink 实时加购数据“维表补全”实战:从 Kafka 到 HBase 再到 Redis 的完整链路

一、业务背景

在电商实时运营场景中,加购行为(AddShoppingCart) 是最核心的用户行为之一,每秒钟可能产生数万条加购事件。以某头部电商平台为例,大促期间加购QPS可突破50万。
为了支持实时推荐、实时营销、实时大屏等业务,我们需要在毫秒级完成以下动作:

  1. 消费 Kafka 中的加购事件(事件包含:user_id, sku_id, timestamp等基础字段);
  2. 根据事件中的 sku_idHBase 维表 补全商品维度(品牌、类目、价格带等12个关键维度);
  3. 将补全后的事件写入 Redis(供推荐 / 大屏 / 算法实时调用),同时支持3种存储模式:
    • Hash结构:适合完整事件存储
    • String结构:适合简单KV场景
    • TTL设置:自动过期避免数据堆积

本文用一套可落地的 Flink Java 工程 演示整条链路,代码已在线上跑通,拿来即可用。方案经过618/双11大促验证,P99延迟稳定在80ms以内。


二、整体架构

Kafka Topic:  dwd_add_cart_event(分区数=32,副本数=3)↓ Flink Source(并行度=16)
CartEvent POJO(基础字段:user_id, sku_id, ts)↓ Async I/O 维表补全(并发度=100)
AsyncGoodsDimLookupFunction → HBase(RegionServer=20节点)↓ 补全后 POJO
CartEvent(扩展字段:brandId, cate1, cate3, priceRange等)↓ Sink(批量写入)
Flink2Redis → Redis Cluster(16分片,32G内存/节点)

关键设计点:

  1. 异步化:使用Flink Async I/O避免同步阻塞
  2. 多级缓存:本地Guava Cache + Redis缓存
  3. 弹性扩展:各组件均可水平扩容

从Kafka到Redis的完整数据处理链路:

Mermaid 流程图

JSON事件
CartEvent POJO
HBase查询
Redis缓存
补全维度
完整事件
String/Hash模式
监控指标
Kafka: dwd_add_cart_event
Flink Source
Async I/O 维表补全
HBase: goods_dim表
Redis Cluster
Flink Sink
Prometheus+Grafana

流程说明

  1. 数据源层
    Kafka原始事件通过user_id+sku_id+timestamp组成基础事件体,分区数需根据QPS设置(建议分区数=预期峰值QPS/5000)

  2. 维表补全层
    Async I/O采用有序模式确保事件顺序性,通过capacity参数控制并发请求量(公式:capacity = 并行度 × 每个并行任务最大并发

  3. 缓存策略
    本地Guava Cache采用最大条目+过期时间双重控制:

    CacheBuilder.newBuilder().maximumSize(100_000).expireAfterWrite(10, TimeUnit.MINUTES).build();
    
  4. 写入优化
    Redis Sink使用Pipeline批量写入,批量大小建议值:

    • 常规场景:50-100条/批次
    • 大促场景:100-200条/批次
      需根据redis.cluster-timeout配置调整(默认2秒)
  5. 容错机制

    • HBase查询失败时自动重试3次
    • Redis写入失败进入侧输出流后续补偿
    • Checkpoint间隔设置为30秒(状态后端建议RocksDB)

三、核心代码解读

3.1 事件实体 CartEvent

// 使用lombok简化代码
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CartEvent {// 基础字段(来自Kafka原始事件)private String userId;private String skuId;private long timestamp;// 维表补全字段(来自HBase)private String brandId;private String brandName;private String cate1; // 一级类目private String cate2;private String cate3;private String priceRange; // 价格带(0-100,100-300等)// 业务标记字段private boolean isNewUser;private int userLevel;
}

3.2 维表查询 AsyncGoodsDimLookupFunction

核心优化点:

  1. 三级缓存设计

    • 一级:本地Guava Cache(10分钟过期)
    • 二级:Redis集群缓存(1小时过期)
    • 三级:HBase源数据
  2. 异步查询逻辑

@Override
public void asyncInvoke(String skuId, ResultFuture<CartEvent> resultFuture) {// 1. 先查本地缓存CartEvent cached = localCache.getIfPresent(skuId);if (cached != null) {resultFuture.complete(Collections.singleton(cached));return;}// 2. 异步查RedisredisClient.getAsync(skuId).thenAccept(redisValue -> {if (redisValue != null) {// 命中Redis缓存CartEvent event = JSON.parseObject(redisValue, CartEvent.class);localCache.put(skuId, event);resultFuture.complete(Collections.singleton(event));} else {// 3. 查HBaseCompletableFuture.supplyAsync(() -> hbaseQuery(skuId)).thenAccept(hbaseResult -> {// 双写缓存redisClient.setex(skuKey, 3600, JSON.toJSONString(hbaseResult));localCache.put(skuId, hbaseResult);resultFuture.complete(Collections.singleton(hbaseResult));});}});
}

3.3 Redis Sink Flink2Redis

支持多种写入策略:

// String模式
jedis.set(key, value);
// Hash模式
jedis.hset(hashKey, field, value);
// 批量模式
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < batchSize; i++) {pipeline.set(key[i], value[i]);
}
pipeline.sync();
// TTL设置
if (expiration > 0) {jedis.expire(key, expiration);
}

四、Flink Job 主类 FlinkDemo

完整作业流程:

public class FlinkCartJob {public static void main(String[] args) {// 1. 初始化配置Configuration config = loadConfig("config.properties");// 2. 构建SourceKafkaSource<CartEvent> source = KafkaSource.<CartEvent>builder().setBootstrapServers(config.get("bootstrapServers")).setTopics(config.get("inputTopic")).setGroupId(config.get("groupId")).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new KafkaToCartEvent()).build();// 3. 构建处理流水线DataStream<CartEvent> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 4. 异步维表补全DataStream<CartEvent> enrichedStream = AsyncDataStream.orderedWait(stream,new AsyncGoodsDimLookupFunction(config),5000, // 超时5秒TimeUnit.MILLISECONDS,100); // 并发100// 5. Sink处理enrichedStream.addSink(new RedisSinkFunction(config));// 6. 监控指标输出enrichedStream.print();env.execute("RealTime Cart Event Processing");}
}

五、配置说明

完整配置项示例:

# Kafka配置
bootstrapServers=kafka1:9092,kafka2:9092,kafka3:9092
inputTopic=dwd_add_cart_event
groupId=flink-add-cart-dim-01
auto.offset.reset=latest# HBase配置
hbase.zookeeper.quorum=zk1,zk2,zk3
hbase.table=goods_dim
hbase.cache.size=100000
hbase.cache.expire=600# Redis配置
redis.cluster=true
redis.nodes=redis1:6379,redis2:6379,redis3:6379
redis.password=xxxxxx
redis.database=0
redis.mode=hash  # string/hash
redis.expiration=1800
redis.pipeline.size=50  # 批量写入大小# 性能参数
async.timeout=5000
async.capacity=100

六、性能 & 稳定性要点

维度优化方案实现细节
并发控制Async I/O + 动态并发调节根据Kafka lag自动调整并发度
缓存策略多级缓存 + 预加载冷启动时批量加载热点商品维度
容错机制超时降级 + 熔断HBase超时后返回部分数据
资源隔离独立Slot资源池避免与其他作业资源竞争
监控告警Prometheus + Grafana实时监控P99延迟和缓存命中率

七、线上效果

某电商平台上线后关键指标:

指标日常值大促峰值
处理QPS8万/秒52万/秒
P99延迟65ms78ms
HBase查询量5千/秒2万/秒
Redis命中率97.3%95.8%
系统可用性99.99%99.97%

八、结语

本文方案已在多个电商平台落地,主要优势:

  1. 全链路优化:从Kafka消费到Redis写入的完整闭环
  2. 弹性扩展:各组件均可独立扩容,支持千万级QPS
  3. 生产就绪:包含监控、告警、容错等企业级特性
  4. 灵活配置:支持业务字段动态扩展和多存储模式
http://www.xdnf.cn/news/1357129.html

相关文章:

  • GaussDB 数据库架构师修炼(十八) SQL引擎-分布式计划
  • vimware unbuntu18.04 安装之后,没有网络解决方案
  • AI与SEO关键词协同优化
  • 【小程序-慕尚花坊02】网络请求封装和注意事项
  • 个人搭建小网站教程(云服务器Ubuntu版本)
  • 不知道Pycharm怎么安装?Pycharm安装教程(附安装包)
  • MySQL數據庫開發教學(二) 核心概念、重要指令
  • GaussDB 数据库架构师修炼(十八) SQL引擎-统计信息
  • 请求上下文对象RequestContextHolder
  • LIANA | part2 results部分
  • 【贪心算法】day1
  • spring源码之事务篇(事务管理器整个流程)
  • JAVA限流方法
  • PAT 1081 Rational Sum
  • 不只是关键词匹配:AI如何像人类一样‘听懂‘你在说什么
  • Spring Boot 中 @Controller与 @RestController的区别及 404 错误解析
  • 工作记录 2015-08-31
  • 【科研绘图系列】R语言浮游植物初级生产力与光照强度的关系
  • leetcode_189 轮转数组
  • 【LLIE专题】一种用于低光图像增强的空间自适应光照引导 Transformer(SAIGFormer)框架
  • Ansible 自动化基石:变量定义、优先级控制与 Vault 敏感信息加密实战指南
  • 【重学MySQL】八十七. 触发器管理全攻略:SHOW TRIGGERS与DROP TRIGGER实战详解
  • MySQL管理
  • [身份验证脚手架] 认证路由 | 认证后端控制器与请求
  • MR椎间盘和腰椎分割项目:基于深度学习的医学图像分析
  • 【数据结构】栈和队列——栈
  • MyBatis 和 MyBatis-Plus对比
  • 一个奇怪的问题-Python会替代Java吗?技术语言之争的真相-优雅草卓伊凡
  • 深度学习:CUDA、PyTorch下载安装
  • 用 Bright Data MCP Server 构建实时数据驱动的 AI 情报系统:从市场调研到技术追踪的自动化实战