当前位置: 首页 > ds >正文

Project Reactor响应式编程简介

前言:Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。在响应式编程(如 Project Reactor)中,理解 发布(Publish)与订阅(Subscribe)、生产者(Producer)与消费者(Consumer) 的概念非常重要。它们是构建异步、非阻塞数据流的基础模型。


一、Reactor基本概念

1. 发布者(Publisher)

  • 是数据的提供方。

  • 在 Project Reactor 中,Flux 和 Mono 都实现了 Publisher<T> 接口。

  • 它不主动发送数据,而是等待被订阅后才开始发射数据。

类比:就像一个电台频道,在没有人收听时它不会“广播”内容,只有当有人打开收音机(订阅),才会开始播放节目。

Flux<String> publisher = Flux.just("A", "B", "C"); // Publisher

2. 订阅者(Subscriber)

  • 是数据的接收方。

  • 实现 Subscriber<T> 接口,或者使用 .subscribe() 方法作为简化方式。

  • 订阅者会通过回调方法接收数据(onNext)、异常(onError)或完成信号(onComplete)。

publisher.subscribe(data -> System.out.println("Received: " + data), // onNexterr -> System.err.println("Error: " + err),      // onError() -> System.out.println("Done!")                // onComplete
);

3. 订阅(Subscription)

  • 是连接 Publisher 和 Subscriber 的桥梁。

  • 每次调用 .subscribe() 都会创建一个新的 Subscription

  • 支持背压(backpressure)控制:消费者可以告诉生产者“我一次只能处理 N 个元素”。

publisher.subscribe(new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(1); // 请求第一个数据}@Overridepublic void onNext(String t) {System.out.println("Got: " + t);subscription.request(1); // 继续请求下一个}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Completed");}
});

二、生产者和消费者模型(Producer/Consumer)

角色描述
生产者(Producer)提供数据流的一方,即 Publisher(如 FluxMono
消费者(Consumer)接收并处理数据的一方,即 Subscriber
  • 数据从生产者流向消费者。

  • 这种模型支持异步非阻塞的数据传输。

  • 可以通过 背压机制 控制流量,避免消费者被过量数据淹没。


三、Reactor 中的发布与订阅流程

[Publisher] --> (onSubscribe) --> [Subscriber]↓(request)↓
[Publisher emits data via onNext]↓
[Subscriber receives data]↓
[Eventually onComplete or onError]

流程说明:

  1. 订阅建立

    • 调用 .subscribe() 后,Publisher 会调用 onSubscribe(Subscription)

  2. 请求数据

    • Subscriber 调用 subscription.request(n) 表示希望接收 n 个数据。

  3. 数据发射

    • Publisher 发射数据项,调用 onNext(T)

  4. 结束或错误

    • 成功结束:调用 onComplete()

    • 出错:调用 onError(Throwable)


 四、实际应用举例

示例:模拟生产者和消费者的协作(带背压)

Flux.range(1, 100).subscribe(new Subscriber<>() {private Subscription subscription;private int count = 0;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(5); // 初始请求5个数据}@Overridepublic void onNext(Integer integer) {System.out.println("Consuming: " + integer);count++;if (count % 5 == 0) {subscription.request(5); // 每消费5个再请求5个}}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("All items consumed!");}});

 五、常见误区

错误理解正确理解
Flux.just(...) 会立即发射数据不会,除非有订阅者才会发射
Flux 是热源(Hot)默认是冷源(Cold),每次订阅都会重新开始
subscribe() 返回值无关紧要可用于取消订阅(返回 Disposable
所有操作符都是同步的很多操作符是异步的,比如 flatMapdelayElements 等

六、总结

概念说明
Publisher数据源,如 Flux 或 Mono
Subscriber数据消费者,实现 onNextonErroronComplete
Subscription控制数据流动的接口,支持背压
生产者/消费者模型数据从生产者流向消费者,由订阅驱动
背压(Backpressure)消费者可以控制生产者的发射速率
冷流 vs 热流冷流每次订阅都从头开始;热流共享数据流(如 ConnectableFlux

如果你正在使用 Spring WebFluxRSocketKafka Streams 或其他响应式框架,理解这些核心概念将帮助你更好地设计和调试异步系统。

http://www.xdnf.cn/news/13700.html

相关文章:

  • 初识 Redis:从入门到应用的全面指南
  • 数字化动态ID随机水印和ID跑马灯实现教育视频防录屏
  • 数据治理域——离线数据开发
  • Codeforces 2025/6/11 日志
  • 项目练习:使用mybatis的foreach标签,实现union all的拼接语句
  • Nacos快速入门:从安装到实战
  • MySQL 8.0 OCP 英文题库解析(十七)
  • 打砖块(洛谷)
  • 移动端 1px 问题解决方案
  • 从字节到对象的漂流---JavaIO流篇
  • 5. 相机拍摄简单构图
  • 1.9 Express
  • Flutter 常用组件详解:Text、Button、Image、ListView 和 GridView
  • c++中main函数执行完后还执行其它语句吗?
  • FreeRTOS互斥量
  • 面向异构系统的多面体编译优化关键技术研究——李颖颖博士
  • Linux 任务调度策略
  • 一数一源一标准的补充
  • 论文阅读:强化预训练
  • 强化学习入门:交叉熵方法实现CartPole智能体
  • 一个超强的推理增强大模型,开源了,本地部署
  • 跨网数据摆渡系统:破解数据流通难题的“标准答案”
  • 别人如何访问我的内网呢? 设置让外网访问内网本地服务器和指定端口应用的几种方式
  • 曼昆《经济学原理》第九版 第十八章生产要素市场
  • Vue Electron 使用来给若依系统打包成exe程序,出现登录成功但是不跳转页面(已解决)
  • Vue 中 data 选项:对象 vs 函数
  • Rust 学习笔记:通过异步实现并发
  • 【题解-洛谷】P2935 [USACO09JAN] Best Spot S
  • 算法训练第十五天
  • docker推荐应用汇总及部署实战