当前位置: 首页 > web >正文

【Spring WebFlux】 三、响应式流规范与实战

文章目录

    • 一、响应式流规范背景
    • 二、响应式流规范接口
      • Publisher
      • Subscriber
      • Subscription
      • Processor
      • 底层机制
    • 三、响应式流规范实战
      • 异步生产者
      • 异步消费者
      • 测试类

一、响应式流规范背景

为了解决异步系统间的背压(Backpressure)问题(即生产者与消费者的速度不匹配)。

由 Netflix、Lightbend 等公司推动,统一了响应式编程中关于流处理的规范,规范中发布了一组接口,用于实现响应式流。

响应式流规范网址:http://www.reactive-streams.org

响应式流的规范文档
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md

maven仓库地址:

<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-tck</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-tck-flow</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-examples</artifactId><version>1.0.3</version>
</dependency>

响应式流规范定义了异步系统中组件间通过背压(Backpressure)进行交互的标准。

其核心目标是为了解决生产者与消费者之间的速率不匹配问题,确保数据流的高效、可控传递。

Java 9 通过 java.util.concurrent.Flow 类提供了对响应式流规范的官方适配。需注意:

Flow API 是规范而非具体实现,其语义与响应式流完全一致。

开发者需依赖第三方库(如 Project Reactor、Akka Streams)获得实际功能支持。

二、响应式流规范接口

规范定义了以下 4 个基础接口:

Publisher

数据流的生产者(数据源):提供一个 subscribe() 方法让消费者注册到生产者并接收数据,
作为生产者与消费者连接的标准化入口点

public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}

Subscriber

数据流的消费者,提供了一个 onSubscribe 方法来通知 Subscriber 订阅成功

public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
  • onSubscribe: 生产者在开始处理之前调用,并向消费者传递一个 Subscription 对象。
  • onNext: 用于通知消费者,生产者发布了新的数据项。
  • onError: 用于通知消费者,生产者遇到了异常,不再发布数据事件。
  • onComplete: 用于通知消费者,所有的数据事件都已发布完成。

Subscription

代表 订阅关系,用于控制流量,管理 Publisher 和 Subscriber 之间的交互

public interface Subscription {public void request(long n);public void cancel();
}
  • request: 用于让消费者通知生产者随后需要发布的元素数量,用于控制数据流量(背压管理)
  • cancel: 用于让消费者取消生产者随后的事件流,用于取消订阅(释放资源)

Processor

同时扮演 数据生产者(Publisher) 和 数据消费者(Subscriber) 的双重角色,用于在数据流中进行中间处理(如转换、过滤、聚合等)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

以上所有接口都存在于 org.reactivestreams 包中:

底层机制

在这里插入图片描述
Publisher 保证只有在 Subscriber 要求时才发送元素中新的部分。

Publisher 的整体实现既可以采用纯粹的阻塞式等待,也可以采用仅在 Subscriber 请求时才生成数据
的机制。

该规范为我们提供了混合推拉模型,此模型可以对背压进行合理控制。

另外,在某些情况下,可以优先考虑纯推模型。响应式流非常灵活,除动态推拉模型外,该规范还提供了独立的推模型和拉模型。

三、响应式流规范实战

异步生产者

package com.gwx.webflux.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;/*** 异步迭代 Publisher 实现,使用指定的 Executor 异步执行,* 并为指定的 Iterable 生成元素,以 unicast 形式为 Subscriber 提供数据。** <p>注意:本实现包含大量 try-catch 块,用于展示何时可以抛出异常,何时不能抛出异常。</p>** @param <T> 发布的数据类型*/
public class AsyncIterablePublisher<T> implements Publisher<T> {// 默认批次大小private static final int DEFAULT_BATCHSIZE = 1024;// 数据源或生成器private final Iterable<T> elements;// 用于异步执行的线程池private final Executor executor;// 单个线程处理的最大元素数量private final int batchSize;/*** 构造 AsyncIterablePublisher 实例** @param elements 元素生成器* @param executor 线程池*/public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {this(elements, DEFAULT_BATCHSIZE, executor);}/*** 构造 AsyncIterablePublisher 实例** @param elements  元素生成器* @param batchSize 批次大小* @param executor  线程池* @throws IllegalArgumentException 如果 batchSize 小于1* @throws NullPointerException     如果 elements 或 executor 为 null*/public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {if (elements == null) throw new NullPointerException("elements cannot be null");if (executor == null) throw new NullPointerException("executor cannot be null");if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero");this.elements = elements;this.executor = executor;this.batchSize = batchSize;}@Overridepublic void subscribe(final Subscriber<? super T> subscriber) {new SubscriptionImpl(subscriber).init();}// ========== 内部类和接口 ==========/*** 信号接口,用于订阅者和发布者之间的通信*/private interface Signal {}/*** 取消订阅信号*/private enum Cancel implements Signal {INSTANCE}/*** 订阅信号*/private enum Subscribe implements Signal {INSTANCE}/*** 发送数据信号*/private enum Send implements Signal {INSTANCE}/*** 请求数据信号*/private static final class Request implements Signal {final long n;Request(final long n) {this.n = n;}}/*** Subscription 实现类,处理订阅关系和数据流控制*/private final class SubscriptionImpl implements Subscription, Runnable {// 订阅者引用final Subscriber<? super T> subscriber;// 订阅是否已取消private volatile boolean cancelled = false;// 未处理的请求数量private long demand = 0;// 数据迭代器private Iterator<T> iterator;// 入站信号队列private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();// 防止并发执行的标志private final AtomicBoolean on = new AtomicBoolean(false);/*** 创建 Subscription 实例** @param subscriber 订阅者* @throws NullPointerException 如果 subscriber 为 null*/SubscriptionImpl(final Subscriber<? super T> subscriber) {if (subscriber == null) throw new NullPointerException("subscriber cannot be null");this.subscriber = subscriber;}// ========== 核心方法 ==========/*** 处理数据请求** @param n 请求的元素数量* @throws IllegalArgumentException 如果 n < 1*/private void doRequest(final long n) {if (n < 1) {terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));return;}// 处理 long 溢出情况if (demand + n < 1) {demand = Long.MAX_VALUE;} else {demand += n;}doSend();}/*** 取消订阅*/private void doCancel() {cancelled = true;}/*** 执行订阅逻辑*/private void doSubscribe() {try {iterator = elements.iterator();if (iterator == null) {iterator = Collections.<T>emptyList().iterator();}} catch (final Throwable t) {// 获取迭代器失败,发送错误信号subscriber.onSubscribe(new Subscription() {@Overridepublic void cancel() {}@Overridepublic void request(long n) {}});terminateDueTo(t);return;}if (!cancelled) {try {subscriber.onSubscribe(this);} catch (final Throwable t) {terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));return;}boolean hasElements;try {hasElements = iterator.hasNext();} catch (final Throwable t) {terminateDueTo(t);return;}if (!hasElements) {try {doCancel();subscriber.onComplete();} catch (final Throwable t) {// 记录 onComplete 抛出的异常new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t).printStackTrace(System.err);}}}}/*** 发送数据给订阅者*/private void doSend() {try {int leftInBatch = batchSize;do {T next;boolean hasNext;try {next = iterator.next();hasNext = iterator.hasNext();} catch (final Throwable t) {terminateDueTo(t);return;}subscriber.onNext(next);if (!hasNext) {doCancel();subscriber.onComplete();return;}} while (!cancelled && --leftInBatch > 0 && --demand > 0);if (!cancelled && demand > 0) {signal(Send.INSTANCE);}} catch (final Throwable t) {doCancel();new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t).printStackTrace(System.err);}}/*** 终止订阅并发送错误信号** @param t 错误原因*/private void terminateDueTo(final Throwable t) {cancelled = true;try {subscriber.onError(t);} catch (final Throwable t2) {new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);}}// ========== 信号处理 ==========/*** 发送信号到队列并尝试调度执行** @param signal 要发送的信号*/private void signal(final Signal signal) {if (inboundSignals.offer(signal)) {tryScheduleToExecute();}}/*** 尝试调度任务到线程池执行*/private void tryScheduleToExecute() {if (on.compareAndSet(false, true)) {try {executor.execute(this);} catch (Throwable t) {if (!cancelled) {doCancel();try {terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));} finally {inboundSignals.clear();on.set(false);}}}}}@Overridepublic void run() {if (on.get()) {try {final Signal s = inboundSignals.poll();if (!cancelled) {if (s instanceof Request) {doRequest(((Request) s).n);} else if (s == Send.INSTANCE) {doSend();} else if (s == Cancel.INSTANCE) {doCancel();} else if (s == Subscribe.INSTANCE) {doSubscribe();}}} finally {on.set(false);if (!inboundSignals.isEmpty()) {tryScheduleToExecute();}}}}// ========== Subscription 接口实现 ==========@Overridepublic void request(final long n) {signal(new Request(n));}@Overridepublic void cancel() {signal(Cancel.INSTANCE);}/*** 初始化订阅,发送订阅信号*/void init() {signal(Subscribe.INSTANCE);}}
}

异步消费者

package com.gwx.webflux.demo;import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;/*** 基于 Executor 异步运行的订阅者实现,一次请求一个元素,* 然后对每个元素调用用户定义的方法进行处理。** <p>注意:本类包含大量 try-catch 块,用于说明何时可以抛出异常,何时不能抛出异常。</p>** @param <T> 订阅的数据类型*/
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {// ========== 内部信号定义 ==========/*** 发布者和订阅者之间的异步协议信号接口*/private interface Signal {}/*** 数据流完成信号*/private enum OnComplete implements Signal {INSTANCE}/*** 错误信号*/private static class OnError implements Signal {final Throwable error;OnError(Throwable error) {this.error = error;}}/*** 数据项信号*/private static class OnNext<T> implements Signal {final T next;OnNext(T next) {this.next = next;}}/*** 订阅成功信号*/private static class OnSubscribe implements Signal {final Subscription subscription;OnSubscribe(Subscription subscription) {this.subscription = subscription;}}// ========== 成员变量 ==========private Subscription subscription;  // 订阅关系private boolean done;             // 是否已完成处理private final Executor executor;  // 异步执行线程池// 入站信号队列private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();// 防止并发执行的标志private final AtomicBoolean on = new AtomicBoolean(false);// ========== 构造方法 ==========/*** 创建 AsyncSubscriber 实例** @param executor 用于异步处理的线程池* @throws NullPointerException 如果 executor 为 null*/protected AsyncSubscriber(Executor executor) {if (executor == null) throw new NullPointerException("executor cannot be null");this.executor = executor;}// ========== 抽象方法 ==========/*** 处理下一个元素** @param element 接收到的元素* @return 是否还需要更多元素*/protected abstract boolean whenNext(T element);/*** 处理完成信号*/protected void whenComplete() {}/*** 处理错误信号** @param error 错误原因*/protected void whenError(Throwable error) {}// ========== 核心方法 ==========/*** 标记订阅者已完成处理*/private void done() {done = true;if (subscription != null) {try {subscription.cancel();} catch (Throwable t) {new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t).printStackTrace(System.err);}}}/*** 处理订阅信号** @param s 订阅关系*/private void handleOnSubscribe(Subscription s) {if (s == null) {// 忽略 null Subscriptionreturn;}if (subscription != null) {// 重复订阅,取消新订阅try {s.cancel();} catch (Throwable t) {new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t).printStackTrace(System.err);}} else {subscription = s;try {// 每次只请求一个元素s.request(1);} catch (Throwable t) {new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t).printStackTrace(System.err);}}}/*** 处理数据信号** @param element 接收到的元素*/private void handleOnNext(T element) {if (done) return;if (subscription == null) {new IllegalStateException("Violated Reactive Streams rule 1.09 and 2.1 by signalling OnNext before Subscription.request").printStackTrace(System.err);return;}try {if (whenNext(element)) {try {// 处理成功后请求下一个元素subscription.request(1);} catch (Throwable t) {new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t).printStackTrace(System.err);}} else {done();}} catch (Throwable t) {done();try {onError(t);} catch (Throwable t2) {new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);}}}/*** 处理完成信号*/private void handleOnComplete() {if (subscription == null) {new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.").printStackTrace(System.err);return;}done = true;whenComplete();}/*** 处理错误信号** @param error 错误原因*/private void handleOnError(Throwable error) {if (subscription == null) {new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.").printStackTrace(System.err);return;}done = true;whenError(error);}// ========== Subscriber 接口实现 ==========@Overridepublic final void onSubscribe(Subscription s) {if (s == null) throw new NullPointerException("Subscription cannot be null");signal(new OnSubscribe(s));}@Overridepublic final void onNext(T element) {if (element == null) throw new NullPointerException("Element cannot be null");signal(new OnNext<>(element));}@Overridepublic final void onError(Throwable t) {if (t == null) throw new NullPointerException("Throwable cannot be null");signal(new OnError(t));}@Overridepublic final void onComplete() {signal(OnComplete.INSTANCE);}// ========== Runnable 接口实现 ==========@Overridepublic final void run() {if (on.get()) {try {Signal s = inboundSignals.poll();if (!done) {if (s instanceof OnNext) {handleOnNext(((OnNext<T>) s).next);} else if (s instanceof OnSubscribe) {handleOnSubscribe(((OnSubscribe) s).subscription);} else if (s instanceof OnError) {handleOnError(((OnError) s).error);} else if (s == OnComplete.INSTANCE) {handleOnComplete();}}} finally {on.set(false);if (!inboundSignals.isEmpty()) {tryScheduleToExecute();}}}}// ========== 信号处理 ==========/*** 发送信号到队列** @param signal 要发送的信号*/private void signal(Signal signal) {if (inboundSignals.offer(signal)) {tryScheduleToExecute();}}/*** 尝试调度任务到线程池执行*/private void tryScheduleToExecute() {if (on.compareAndSet(false, true)) {try {executor.execute(this);} catch (Throwable t) {if (!done) {try {done();} finally {inboundSignals.clear();on.set(false);}}}}}
}

测试类

public class ReactiveTest {public static void main(String[] args) {// 测试1:基本功能测试System.out.println("=== 测试1:基本功能测试 ===");testBasicFunctionality();// 测试2:异常处理测试System.out.println("\n=== 测试2:异常处理测试 ===");testErrorHandling();// 测试3:大数据量测试System.out.println("\n=== 测试3:大数据量测试 ===");testLargeDataSet();}private static void testBasicFunctionality() {Set<Integer> elements = new HashSet<>();for (int i = 1; i <= 5; i++) {elements.add(i);}ExecutorService executor = Executors.newFixedThreadPool(3);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {@Overrideprotected boolean whenNext(Integer element) {System.out.println("处理元素: " + element);return true;}@Overrideprotected void whenComplete() {System.out.println("所有元素处理完成");executor.shutdown();}});}private static void testErrorHandling() {Set<Integer> elements = new HashSet<>();elements.add(1);elements.add(2);elements.add(3);ExecutorService executor = Executors.newFixedThreadPool(2);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {@Overrideprotected boolean whenNext(Integer element) {System.out.println("处理元素: " + element);if (element == 2) {throw new RuntimeException("模拟处理元素2时出错");}return true;}@Overrideprotected void whenError(Throwable error) {System.err.println("发生错误: " + error.getMessage());executor.shutdown();}});}private static void testLargeDataSet() {Set<Integer> elements = new HashSet<>();for (int i = 1; i <= 1000; i++) {elements.add(i);}ExecutorService executor = Executors.newFixedThreadPool(10);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);long startTime = System.currentTimeMillis();publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newFixedThreadPool(4)) {private int count = 0;@Overrideprotected boolean whenNext(Integer element) {count++;if (count % 100 == 0) {System.out.println("已处理 " + count + " 个元素");}return true;}@Overrideprotected void whenComplete() {long duration = System.currentTimeMillis() - startTime;System.out.println("处理完成,共处理 " + count + " 个元素,耗时 " + duration + " 毫秒");executor.shutdown();}});}
}
http://www.xdnf.cn/news/16652.html

相关文章:

  • Java 笔记 serialVersionUID
  • ADB+Python控制(有线/无线) Scrcpy+按键映射(推荐)
  • 服务器查日志太慢,试试grep组合拳
  • 时序数据库选型指南:工业大数据场景下基于Apache IoTDB技术价值与实践路径
  • 5 分钟上手 Firecrawl
  • 【办公类-109-01】20250728托小班新生挂牌(学号姓名)
  • API产品升级丨全知科技发布「知影-API风险监测平台」:以AI重构企业数据接口安全治理新范式
  • 企业级日志分析系统ELK
  • Pycaita二次开发基础代码解析:点距测量、对象层级关系与选择机制深度剖析
  • 基于DeepSeek大模型和STM32的矿井“围压-温度-开采扰动“三位一体智能监测系统设计
  • 边缘计算+前端实时性:本地化数据处理在设备监控中的响应优化实践
  • vue element 封装表单
  • STM32时钟源
  • GaussDB as的用法
  • 【氮化镓】GaN同质外延p-i-n二极管中星形与三角形扩展表面缺陷的电子特性
  • 力扣 hot100 Day58
  • LeetCode 2044.统计按位或能得到最大值的子集数目:二进制枚举/DFS回溯(剪枝)
  • 介绍一下static关键字
  • IP协议解析:从寻址到路由
  • MCP协议全景解析:从工业总线到AI智能体的连接革命
  • 【基础篇三】WebSocket:实时通信的革命
  • CDN架构全景图
  • 硕博电子大功率IO模块
  • opencv学习(轮廓检测)
  • 【论文阅读】Safety Alignment Should Be Made More Than Just a Few Tokens Deep
  • 微型化IMU如何突破无人机与机器人的性能边界?
  • 数据开源 | “白虎”数据集首批开源,迈出百万数据征途第一步
  • 医疗人工智能高质量数据集和语料库建设路径探析
  • linux安装zsh,oh-my-zsh,配置zsh主题及插件的方法
  • 3. Socket 编程 TCP