当前位置: 首页 > ds >正文

flink超时未揽收单量统计

应用场景: 双十一大屏统计 - - 订单超时汇总

项目指标概况:

应用背景:晚点超时指标,例如:出库超6小时未揽收订单量

难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标恰好是要求在指定的超时时刻计算出有多少“未到达”的消息,可以预警出订单积压等异常现象

方案1:flink往db里面高TPS写,产品前端高RPS查询OLAP数据库明细,大促数据洪峰场景因查询暴增会使得数据库压力打满,明细查询的方式势必不能支持日后大促暴涨的单量

方案2:metaQ定时消息,订单消息写入metaQ,利用metaQ的定时消息功能,根据用户写入的消息和时间,在指定时刻下发,flink接收两个数据源(kafka订单流,kafka出库流,metaQ延时消息流)判断订单是否超时揽收,这种方式除了需要维护flink程序,同时还要保障额外的消息中间层维护

方案3:flinkcep,使用起来确实比较简便,但是实际在统计上和真实结果有一定出入,原因是出库时间会被回传多次,开始回传的是9点,后面发现回传错了,又改成了8点,而cep的watermark是全局向前走的,对于这种场景,无法很好的适配

方案4:flink的processfunction,是一个low-level流处理操作,通过改写其中的Processelement方法,可以告诉flinkstate里面存什么,以及如何更新state。通过改写ontimer方法,可以告诉state何时下发超时消息

具体操作:

1.首先,根据业务主键物流订单code将消息做keyby处理,不同主键值的消息分流到不同的partition里面,生成keyedstream,因为在后续processfuntion中操作的state是valuestate类型的,即每一个key值对应一个state,更新是以key粒度(一个物流订单)进行的

2.每一条消息在processfuntion中处理时,为每个key的消息计算出timeoutmemont,并将该时刻注册到timeservice的定时器中,同时存储该消息至state,当同一个key值有多条消息到来时,可根据消息状态对state进行更新

3.当机器时间来到timeoutmemont时,timeserivcr中的定时器会自动回调ontimer函数,我们事先已经在ontimer函数中定义好操作:获取state,并判断标志位进行下发

如此一来,便做到了:制造出超时消息,并将其暂存在flink state中

该方案优势:

1.部署,运维成本比较低,不需要引入额外的消息中间件;

2.性能优良:

source rps:avg 3k/s,max:4.5k/s

sink rps:avg 1.5k/s,max:2.4k/s

延迟:avg:2.5/s,max:3.7s

3.通用化,复用性高,可以复用到各类业务场景,只需要修改下配置(超时时间)

1. 超时检测逻辑(改写ProcessFunction)

public class TimeoutDetector extends KeyedProcessFunction<String, Row, Tuple2<String, Long>> {private ValueState<Long> createTimeState;private ValueState<Boolean> scanFlagState;@Overridepublic void open(Configuration parameters) {// 初始化出库时间状态createTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("createTime", Long.class));// 初始化揽收标记状态scanFlagState = getRuntimeContext().getState(new ValueStateDescriptor<>("scanFlag", Boolean.class));}@Overridepublic void processElement(Row row, Context ctx, Collector<Tuple2<String, Long>> out) {String eventType = row.getFieldAs("event_type");Long eventTs = ((Timestamp)row.getFieldAs("ts")).getTime();if ("create".equals(eventType)) {// 记录出库时间并注册定时器createTimeState.update(eventTs);ctx.timerService().registerEventTimeTimer(eventTs + 6 * 3600 * 1000);} else if ("scan".equals(eventType)) {// 标记已揽收scanFlagState.update(true);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {if (scanFlagState.value() == null || !scanFlagState.value()) {// 输出超时订单ID与出库时间out.collect(Tuple2.of(ctx.getCurrentKey(), createTimeState.value()));}// 清理状态createTimeState.clear();scanFlagState.clear();}
}

在flinksql中调用udf

CREATE TEMPORARY FUNCTION CheckTimeout AS 'com.example.TimeoutDetector';SELECT order_id,ship_time,CheckTimeout(order_id, ship_time) AS is_timeout
FROM (SELECT order_id,event_time AS ship_timeFROM order_eventsWHERE event_type = 'SHIP'
) 
WHERE is_timeout = true;

doris sink 

INSERT INTO doris_sink_table
SELECT TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start,COUNT(order_id) AS timeout_count
FROM timeout_orders
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

方案3

flinkcep处理:

-- 步骤1:定义数据源表(Kafka输入)
CREATE TABLE order_events (order_id         STRING,event_type       STRING,   -- 'outbound'=出库, 'collection'=揽收event_time       TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 允许5秒乱序
) WITH ('connector' = 'kafka','topic'     = 'order_events','scan.startup.mode' = 'latest-offset','format'    = 'json'
);-- 步骤2:使用 MATCH_RECOGNIZE 进行超时模式匹配
CREATE TABLE timeout_orders (order_id         STRING,outbound_time    TIMESTAMP(3),timeout_reason   STRING
) WITH ('connector' = 'kafka','topic'     = 'timeout_alerts','format'    = 'json'
);INSERT INTO timeout_orders
SELECT order_id, outbound_time, '未在6小时内揽收' AS timeout_reason
FROM order_events
MATCH_RECOGNIZE (PARTITION BY order_idORDER BY event_timeMEASURESA.event_time AS outbound_time,LAST(B.event_time) AS collection_timeONE ROW PER MATCHAFTER MATCH SKIP TO LAST BPATTERN (A NOT? B*) WITHIN INTERVAL '6' HOUR  -- 超时窗口定义DEFINEA AS event_type = 'outbound',B AS event_type = 'collection' AND B.event_time <= A.event_time + INTERVAL '6' HOUR
)
WHERE collection_time IS NULL;  -- 未匹配到揽收事件

关键设计解析

  1. 时间语义处理

    • WATERMARK 定义处理5秒内的乱序事件,与物流场景常见的网络延迟匹配 
    • WITHIN INTERVAL '6' HOUR 精确控制超时窗口,符合出库后6小时未揽收的业务规则 
  2. 模式匹配逻辑

    • PATTERN (A NOT? B*) 表示匹配出库事件后未出现揽收事件(NOT操作符)
    • DEFINE B 中增加时间约束,确保揽收事件在出库后6小时内发生
  3. 结果处理优化

    • ONE ROW PER MATCH 减少重复告警,每个订单仅触发一次超时事件
    • AFTER MATCH SKIP TO LAST B 跳过已处理事件,提升处理效率 

优化方式:

1.rocksdb statebackend RocksDB 将状态存储在磁盘而非内存,适合 TB 级状态;增量检查点仅保存差异数据,减少 IO 压力。

2.大字段/无关字段去除

3.statettl=13h(12h+1h缓冲)自动清理已完成窗口的过期状态,避免状态无限膨胀

4.检查点间隔与异步快照,增大checkpoint时间间隔

SET 'execution.checkpointing.interval' = '10min';  -- 增大间隔降低IO压力
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.unaligned' = 'true';  -- 启用非对齐检查点

三、技术优化策略

  1. 状态管理优化

    • 启用RocksDB状态后端:state.backend: rocksdb
    • 设置TTL自动清理过期订单状态:table.exec.state.ttl = 2h
  2. 性能调优

    • 调整并行度:SET 'parallelism.default' = 8;
    • 启用MiniBatch聚合:table.exec.mini-batch.enabled = true
  3. 容错机制


 

http://www.xdnf.cn/news/4799.html

相关文章:

  • 【AI提示词】马斯洛需求分析专家
  • 【Linux】socket网络编程之UDP
  • cursor平替,试试 vscode+cline+openrouter 的方案,还能自定义 mcp-server 教程大纲
  • 【算法-链表】链表操作技巧:常见算法
  • 元宇宙虚拟展厅如何制作?
  • 华为网路设备学习-21 路由过滤(filter-policy)
  • 微服务不注册到nacos的方法
  • HTML9:页面结构分析
  • 初识Linux · 传输层协议TCP · 上
  • 坐标系与坐标系数转换
  • 接口自动化测试框架详解(pytest+allure+aiohttp+ 用例自动生成)
  • 一文读懂Nginx应用之 HTTP负载均衡(七层负载均衡)
  • 记录微信小程序掉起半屏失效问题
  • 带你玩转 Flink TumblingWindow:从理论到代码的深度探索
  • 使用 CDN 在国内加载本地 PDF 文件并处理批注:PDF.js 5.x 实战指南
  • QMK键盘固件自定义指南 - 打造你的专属键盘体验
  • Python开发之os.path的常用操作
  • Vim 编辑器常用快捷键速查表
  • 排序算法——基数排序
  • 函数级重构:如何写出高可读性的方法?
  • 生产型机器学习系统:静态训练与动态训练的权衡与实践
  • mobile自动化测试-appium webdriverio
  • element-ui form 组件源码分享
  • 2025.5.8总结(中期审视)
  • JAVA多线程进阶
  • 第五十四篇 AI与数据分析
  • 推测式思维树:让大模型快速完成复杂推理
  • 针对共享内存和上述windows消息机制 在C++ 和qt之间的案例 进行详细举例说明
  • Android7 Input(六)InputChannel
  • OpenHarmony平台驱动开发(九),MIPI DSI