当前位置: 首页 > news >正文

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统

ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐


📚 目录

  • ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
    • 一、背景🚀
    • 二、系统整体架构 🏗️
    • 三、实战展示 🛠️:交易行为告警系统
      • 3.1 ABP 采集交易事件 📝
        • CAP + Outbox 配置示例 💼
      • 3.2 Flink CEP 模式与 Exactly-Once ⚡
      • 3.3 Redis Stream + SignalR 实时推送 🔔
    • 四、生产级部署和监控 📈
    • 五、自动化测试 🧪


一、背景🚀

在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。


二、系统整体架构 🏗️

Publish Event
消费 Transaction
写入警报
推送警报
读取警报
实时推送
ABP VNext API
Kafka: transactions
Flink CEP Job
PostgreSQL Sink
Redis Stream
RiskAlertWorker
SignalR Hub

💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。


三、实战展示 🛠️:交易行为告警系统

3.1 ABP 采集交易事件 📝

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;public class TransactionCreatedDomainEvent : DomainEvent
{public Guid UserId { get; set; }public decimal Amount { get; set; }public string Location { get; set; }
}public class TransactionCreatedHandler : IDistributedEventHandler<TransactionCreatedDomainEvent>
{private readonly IDistributedEventBus _eventBus;private readonly ILogger<TransactionCreatedHandler> _logger;public TransactionCreatedHandler(IDistributedEventBus eventBus,ILogger<TransactionCreatedHandler> logger){_eventBus = eventBus;_logger = logger;}public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData){var eto = new TransactionCreatedEto{UserId = eventData.UserId,Amount = eventData.Amount,Location = eventData.Location,OccurredAt = Clock.Now};try{await _eventBus.PublishAsync(eto);}catch (Exception ex){_logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId);throw;}}
}
CAP + Outbox 配置示例 💼
// appsettings.json
"Cap": {"UseEntityFramework": true,"UseDashboard": true,"Producer": {"Kafka": { "Servers": "localhost:9092" }},"Outbox": { "TableName": "CapOutboxMessages" }
}

3.2 Flink CEP 模式与 Exactly-Once ⚡

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.eventtime._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import java.time.Durationval env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints"))
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5)).withTimestampAssigner((event, _) => event.timestamp.toEpochMilli)val stream = env.addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props)).assignTimestampsAndWatermarks(watermarkStrategy)val pattern = Pattern.begin[Transaction]("first").where(_.amount > 10000).next("second").where(new IterativeCondition[Transaction] {override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = {val first = ctx.getEventsForPattern("first").iterator().next()event.location != first.location}}).within(Time.minutes(5))pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] {override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = {// 超时清理逻辑}
}, Time.minutes(5))
ABP API Kafka Flink Redis Worker SignalR 发布交易事件 消费并处理流 推送警报 StreamReadGroup SignalR 推送 ABP API Kafka Flink Redis Worker SignalR

💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。


3.3 Redis Stream + SignalR 实时推送 🔔

using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;public class RiskAlertWorker : BackgroundService
{private readonly IConnectionMultiplexer _redis;private readonly IHubContext<RiskAlertHub> _hubContext;private readonly ILogger<RiskAlertWorker> _logger;public RiskAlertWorker(IConnectionMultiplexer redis,IHubContext<RiskAlertHub> hubContext,ILogger<RiskAlertWorker> logger){_redis = redis;_hubContext = hubContext;_logger = logger;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){var db = _redis.GetDatabase();try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); }catch { /* 忽略 BUSYGROUP */ }int backoff = 1000;while (!stoppingToken.IsCancellationRequested){try{var entries = await db.StreamReadGroupAsync("risk-alerts", "alert-group", "consumer-1",count: 10, flags: CommandFlags.Block(5000));foreach (var entry in entries){var alert = JsonSerializer.Deserialize<RiskEventDto>(entry["data"]!);await _hubContext.Clients.Group(alert.UserId.ToString()).SendAsync("ReceiveAlert", alert, stoppingToken);await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id);}backoff = 1000;}catch (Exception ex){_logger.LogError(ex, "处理 Redis 告警失败");await Task.Delay(backoff, stoppingToken);backoff = Math.Min(backoff * 2, 16000);}}}
}[Authorize]
public class RiskAlertHub : Hub { }

四、生产级部署和监控 📈

组件推荐配置
ABP 后端Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox
Kafkaenable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐
FlinkRocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟
RedisRedis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳
PostgreSQL主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊
SignalRAzure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏
# Flink YAML 示例
state.backend: rocksdb
checkpointing:interval: 10smode: EXACTLY_ONCEexternalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Flink Prometheus Reporter
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250

📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。


五、自动化测试 🧪

// Testcontainers 启动依赖
var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult();
var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult();
var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult();// 注入到 ABP 测试模块
context.Services.Configure<CapOptions>(opts => {opts.ProducerConnectionString = kafka.GetBootstrapAddress();opts.OutboxTableName = "CapOutboxMessages";
});// Flink MiniCluster
var flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().Build());
flinkCluster.Start();

http://www.xdnf.cn/news/669277.html

相关文章:

  • 【深度学习】9. CNN性能提升-轻量化模型专辑:SqueezeNet / MobileNet / ShuffleNet / EfficientNet
  • 汽车电子/电气(E/E)架构将朝着区域(分区)式架构方向发展
  • Filebeat es 同步服务器日志到es
  • C++ STL 容器:List 深度解析与实践指南
  • Linux编辑器——vim的使用
  • 文件上传白名单绕过(图片马 - 图片二次渲染绕过)
  • React从基础入门到高级实战:React 核心技术 - React 与 TypeScript:构建类型安全的应用
  • 第十章:构建之巅 · 打包与部署的终极试炼
  • uniapp-商城-72-shop(5-商品列表,步进器添加商品到的购物车实现)
  • Unsupervised Learning-Word Embedding
  • 如何提高CAD作图设计效率,技术分享
  • 每日算法 -【Swift 算法】实现回文数判断!
  • stm32f系列工程切换到H系列
  • 电芯单节精密焊接机:以先进功能与特点赋能电池制造科技升级
  • 传统数据表设计与Prompt驱动设计的范式对比:以NBA投篮数据表为例
  • PHPStudy 一键式网站搭建工具的下载使用
  • EfficientLLM: Efficiency in Large Language Models 高效大模型
  • AppArmor(Application Armor)是 Linux 内核的一个安全模块
  • 比亚迪“双剑”电池获中汽中心权威认证,堪称“移动安全堡垒”。
  • HTTPS 协议:数据传输安全的坚实堡垒
  • 视频监控汇聚平台EasyCVR工业与安全监控:防爆摄像机的安全应用与注意事项
  • 大模型(5)——编码器(Encoder)、解码器(Decoder)
  • 分布式爬虫监控架构设计
  • Camera相机人脸识别系列专题分析之一:人脸识别系列专题SOP及理论知识介绍
  • 用Qt/C++玩转观察者模式:一个会聊天的设计模式
  • 32.第二阶段x64游戏实战-封包-公共call
  • [Windows] 视频配音:Krillin AI v1.1.4
  • 【NLP基础知识系列课程-Tokenizer的前世今生第一课】Tokenizer 是什么?为什么重要?
  • Mac redis下载和安装
  • 【Docker】存储卷