【Spring WebFlux】 三、响应式流规范与实战
文章目录
- 一、响应式流规范背景
- 二、响应式流规范接口
- Publisher
- Subscriber
- Subscription
- Processor
- 底层机制
- 三、响应式流规范实战
- 异步生产者
- 异步消费者
- 测试类
一、响应式流规范背景
为了解决异步系统间的背压(Backpressure)问题(即生产者与消费者的速度不匹配)。
由 Netflix、Lightbend 等公司推动,统一了响应式编程中关于流处理的规范,规范中发布了一组接口,用于实现响应式流。
响应式流规范网址:http://www.reactive-streams.org
响应式流的规范文档:
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md
maven仓库地址:
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-tck</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-tck-flow</artifactId><version>1.0.3</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-examples</artifactId><version>1.0.3</version>
</dependency>
响应式流规范定义了异步系统中组件间通过背压(Backpressure)进行交互的标准。
其核心目标是为了解决生产者与消费者之间的速率不匹配问题,确保数据流的高效、可控传递。
Java 9 通过 java.util.concurrent.Flow 类提供了对响应式流规范的官方适配。需注意:
Flow API 是规范而非具体实现,其语义与响应式流完全一致。
开发者需依赖第三方库(如 Project Reactor、Akka Streams)获得实际功能支持。
二、响应式流规范接口
规范定义了以下 4 个基础接口:
Publisher
数据流的生产者(数据源):提供一个 subscribe() 方法让消费者注册到生产者并接收数据,
作为生产者与消费者连接的标准化入口点
public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}
Subscriber
数据流的消费者,提供了一个 onSubscribe 方法来通知 Subscriber 订阅成功
public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
- onSubscribe: 生产者在开始处理之前调用,并向消费者传递一个 Subscription 对象。
- onNext: 用于通知消费者,生产者发布了新的数据项。
- onError: 用于通知消费者,生产者遇到了异常,不再发布数据事件。
- onComplete: 用于通知消费者,所有的数据事件都已发布完成。
Subscription
代表 订阅关系,用于控制流量,管理 Publisher 和 Subscriber 之间的交互
public interface Subscription {public void request(long n);public void cancel();
}
- request: 用于让消费者通知生产者随后需要发布的元素数量,用于控制数据流量(背压管理)
- cancel: 用于让消费者取消生产者随后的事件流,用于取消订阅(释放资源)
Processor
同时扮演 数据生产者(Publisher) 和 数据消费者(Subscriber) 的双重角色,用于在数据流中进行中间处理(如转换、过滤、聚合等)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
以上所有接口都存在于 org.reactivestreams 包中:
底层机制
Publisher 保证只有在 Subscriber 要求时才发送元素中新的部分。
Publisher 的整体实现既可以采用纯粹的阻塞式等待,也可以采用仅在 Subscriber 请求时才生成数据
的机制。
该规范为我们提供了混合推拉模型,此模型可以对背压进行合理控制。
另外,在某些情况下,可以优先考虑纯推模型。响应式流非常灵活,除动态推拉模型外,该规范还提供了独立的推模型和拉模型。
三、响应式流规范实战
异步生产者
package com.gwx.webflux.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;/*** 异步迭代 Publisher 实现,使用指定的 Executor 异步执行,* 并为指定的 Iterable 生成元素,以 unicast 形式为 Subscriber 提供数据。** <p>注意:本实现包含大量 try-catch 块,用于展示何时可以抛出异常,何时不能抛出异常。</p>** @param <T> 发布的数据类型*/
public class AsyncIterablePublisher<T> implements Publisher<T> {// 默认批次大小private static final int DEFAULT_BATCHSIZE = 1024;// 数据源或生成器private final Iterable<T> elements;// 用于异步执行的线程池private final Executor executor;// 单个线程处理的最大元素数量private final int batchSize;/*** 构造 AsyncIterablePublisher 实例** @param elements 元素生成器* @param executor 线程池*/public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {this(elements, DEFAULT_BATCHSIZE, executor);}/*** 构造 AsyncIterablePublisher 实例** @param elements 元素生成器* @param batchSize 批次大小* @param executor 线程池* @throws IllegalArgumentException 如果 batchSize 小于1* @throws NullPointerException 如果 elements 或 executor 为 null*/public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {if (elements == null) throw new NullPointerException("elements cannot be null");if (executor == null) throw new NullPointerException("executor cannot be null");if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero");this.elements = elements;this.executor = executor;this.batchSize = batchSize;}@Overridepublic void subscribe(final Subscriber<? super T> subscriber) {new SubscriptionImpl(subscriber).init();}// ========== 内部类和接口 ==========/*** 信号接口,用于订阅者和发布者之间的通信*/private interface Signal {}/*** 取消订阅信号*/private enum Cancel implements Signal {INSTANCE}/*** 订阅信号*/private enum Subscribe implements Signal {INSTANCE}/*** 发送数据信号*/private enum Send implements Signal {INSTANCE}/*** 请求数据信号*/private static final class Request implements Signal {final long n;Request(final long n) {this.n = n;}}/*** Subscription 实现类,处理订阅关系和数据流控制*/private final class SubscriptionImpl implements Subscription, Runnable {// 订阅者引用final Subscriber<? super T> subscriber;// 订阅是否已取消private volatile boolean cancelled = false;// 未处理的请求数量private long demand = 0;// 数据迭代器private Iterator<T> iterator;// 入站信号队列private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();// 防止并发执行的标志private final AtomicBoolean on = new AtomicBoolean(false);/*** 创建 Subscription 实例** @param subscriber 订阅者* @throws NullPointerException 如果 subscriber 为 null*/SubscriptionImpl(final Subscriber<? super T> subscriber) {if (subscriber == null) throw new NullPointerException("subscriber cannot be null");this.subscriber = subscriber;}// ========== 核心方法 ==========/*** 处理数据请求** @param n 请求的元素数量* @throws IllegalArgumentException 如果 n < 1*/private void doRequest(final long n) {if (n < 1) {terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));return;}// 处理 long 溢出情况if (demand + n < 1) {demand = Long.MAX_VALUE;} else {demand += n;}doSend();}/*** 取消订阅*/private void doCancel() {cancelled = true;}/*** 执行订阅逻辑*/private void doSubscribe() {try {iterator = elements.iterator();if (iterator == null) {iterator = Collections.<T>emptyList().iterator();}} catch (final Throwable t) {// 获取迭代器失败,发送错误信号subscriber.onSubscribe(new Subscription() {@Overridepublic void cancel() {}@Overridepublic void request(long n) {}});terminateDueTo(t);return;}if (!cancelled) {try {subscriber.onSubscribe(this);} catch (final Throwable t) {terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));return;}boolean hasElements;try {hasElements = iterator.hasNext();} catch (final Throwable t) {terminateDueTo(t);return;}if (!hasElements) {try {doCancel();subscriber.onComplete();} catch (final Throwable t) {// 记录 onComplete 抛出的异常new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t).printStackTrace(System.err);}}}}/*** 发送数据给订阅者*/private void doSend() {try {int leftInBatch = batchSize;do {T next;boolean hasNext;try {next = iterator.next();hasNext = iterator.hasNext();} catch (final Throwable t) {terminateDueTo(t);return;}subscriber.onNext(next);if (!hasNext) {doCancel();subscriber.onComplete();return;}} while (!cancelled && --leftInBatch > 0 && --demand > 0);if (!cancelled && demand > 0) {signal(Send.INSTANCE);}} catch (final Throwable t) {doCancel();new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t).printStackTrace(System.err);}}/*** 终止订阅并发送错误信号** @param t 错误原因*/private void terminateDueTo(final Throwable t) {cancelled = true;try {subscriber.onError(t);} catch (final Throwable t2) {new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);}}// ========== 信号处理 ==========/*** 发送信号到队列并尝试调度执行** @param signal 要发送的信号*/private void signal(final Signal signal) {if (inboundSignals.offer(signal)) {tryScheduleToExecute();}}/*** 尝试调度任务到线程池执行*/private void tryScheduleToExecute() {if (on.compareAndSet(false, true)) {try {executor.execute(this);} catch (Throwable t) {if (!cancelled) {doCancel();try {terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));} finally {inboundSignals.clear();on.set(false);}}}}}@Overridepublic void run() {if (on.get()) {try {final Signal s = inboundSignals.poll();if (!cancelled) {if (s instanceof Request) {doRequest(((Request) s).n);} else if (s == Send.INSTANCE) {doSend();} else if (s == Cancel.INSTANCE) {doCancel();} else if (s == Subscribe.INSTANCE) {doSubscribe();}}} finally {on.set(false);if (!inboundSignals.isEmpty()) {tryScheduleToExecute();}}}}// ========== Subscription 接口实现 ==========@Overridepublic void request(final long n) {signal(new Request(n));}@Overridepublic void cancel() {signal(Cancel.INSTANCE);}/*** 初始化订阅,发送订阅信号*/void init() {signal(Subscribe.INSTANCE);}}
}
异步消费者
package com.gwx.webflux.demo;import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;/*** 基于 Executor 异步运行的订阅者实现,一次请求一个元素,* 然后对每个元素调用用户定义的方法进行处理。** <p>注意:本类包含大量 try-catch 块,用于说明何时可以抛出异常,何时不能抛出异常。</p>** @param <T> 订阅的数据类型*/
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {// ========== 内部信号定义 ==========/*** 发布者和订阅者之间的异步协议信号接口*/private interface Signal {}/*** 数据流完成信号*/private enum OnComplete implements Signal {INSTANCE}/*** 错误信号*/private static class OnError implements Signal {final Throwable error;OnError(Throwable error) {this.error = error;}}/*** 数据项信号*/private static class OnNext<T> implements Signal {final T next;OnNext(T next) {this.next = next;}}/*** 订阅成功信号*/private static class OnSubscribe implements Signal {final Subscription subscription;OnSubscribe(Subscription subscription) {this.subscription = subscription;}}// ========== 成员变量 ==========private Subscription subscription; // 订阅关系private boolean done; // 是否已完成处理private final Executor executor; // 异步执行线程池// 入站信号队列private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();// 防止并发执行的标志private final AtomicBoolean on = new AtomicBoolean(false);// ========== 构造方法 ==========/*** 创建 AsyncSubscriber 实例** @param executor 用于异步处理的线程池* @throws NullPointerException 如果 executor 为 null*/protected AsyncSubscriber(Executor executor) {if (executor == null) throw new NullPointerException("executor cannot be null");this.executor = executor;}// ========== 抽象方法 ==========/*** 处理下一个元素** @param element 接收到的元素* @return 是否还需要更多元素*/protected abstract boolean whenNext(T element);/*** 处理完成信号*/protected void whenComplete() {}/*** 处理错误信号** @param error 错误原因*/protected void whenError(Throwable error) {}// ========== 核心方法 ==========/*** 标记订阅者已完成处理*/private void done() {done = true;if (subscription != null) {try {subscription.cancel();} catch (Throwable t) {new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t).printStackTrace(System.err);}}}/*** 处理订阅信号** @param s 订阅关系*/private void handleOnSubscribe(Subscription s) {if (s == null) {// 忽略 null Subscriptionreturn;}if (subscription != null) {// 重复订阅,取消新订阅try {s.cancel();} catch (Throwable t) {new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t).printStackTrace(System.err);}} else {subscription = s;try {// 每次只请求一个元素s.request(1);} catch (Throwable t) {new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t).printStackTrace(System.err);}}}/*** 处理数据信号** @param element 接收到的元素*/private void handleOnNext(T element) {if (done) return;if (subscription == null) {new IllegalStateException("Violated Reactive Streams rule 1.09 and 2.1 by signalling OnNext before Subscription.request").printStackTrace(System.err);return;}try {if (whenNext(element)) {try {// 处理成功后请求下一个元素subscription.request(1);} catch (Throwable t) {new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t).printStackTrace(System.err);}} else {done();}} catch (Throwable t) {done();try {onError(t);} catch (Throwable t2) {new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);}}}/*** 处理完成信号*/private void handleOnComplete() {if (subscription == null) {new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.").printStackTrace(System.err);return;}done = true;whenComplete();}/*** 处理错误信号** @param error 错误原因*/private void handleOnError(Throwable error) {if (subscription == null) {new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.").printStackTrace(System.err);return;}done = true;whenError(error);}// ========== Subscriber 接口实现 ==========@Overridepublic final void onSubscribe(Subscription s) {if (s == null) throw new NullPointerException("Subscription cannot be null");signal(new OnSubscribe(s));}@Overridepublic final void onNext(T element) {if (element == null) throw new NullPointerException("Element cannot be null");signal(new OnNext<>(element));}@Overridepublic final void onError(Throwable t) {if (t == null) throw new NullPointerException("Throwable cannot be null");signal(new OnError(t));}@Overridepublic final void onComplete() {signal(OnComplete.INSTANCE);}// ========== Runnable 接口实现 ==========@Overridepublic final void run() {if (on.get()) {try {Signal s = inboundSignals.poll();if (!done) {if (s instanceof OnNext) {handleOnNext(((OnNext<T>) s).next);} else if (s instanceof OnSubscribe) {handleOnSubscribe(((OnSubscribe) s).subscription);} else if (s instanceof OnError) {handleOnError(((OnError) s).error);} else if (s == OnComplete.INSTANCE) {handleOnComplete();}}} finally {on.set(false);if (!inboundSignals.isEmpty()) {tryScheduleToExecute();}}}}// ========== 信号处理 ==========/*** 发送信号到队列** @param signal 要发送的信号*/private void signal(Signal signal) {if (inboundSignals.offer(signal)) {tryScheduleToExecute();}}/*** 尝试调度任务到线程池执行*/private void tryScheduleToExecute() {if (on.compareAndSet(false, true)) {try {executor.execute(this);} catch (Throwable t) {if (!done) {try {done();} finally {inboundSignals.clear();on.set(false);}}}}}
}
测试类
public class ReactiveTest {public static void main(String[] args) {// 测试1:基本功能测试System.out.println("=== 测试1:基本功能测试 ===");testBasicFunctionality();// 测试2:异常处理测试System.out.println("\n=== 测试2:异常处理测试 ===");testErrorHandling();// 测试3:大数据量测试System.out.println("\n=== 测试3:大数据量测试 ===");testLargeDataSet();}private static void testBasicFunctionality() {Set<Integer> elements = new HashSet<>();for (int i = 1; i <= 5; i++) {elements.add(i);}ExecutorService executor = Executors.newFixedThreadPool(3);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {@Overrideprotected boolean whenNext(Integer element) {System.out.println("处理元素: " + element);return true;}@Overrideprotected void whenComplete() {System.out.println("所有元素处理完成");executor.shutdown();}});}private static void testErrorHandling() {Set<Integer> elements = new HashSet<>();elements.add(1);elements.add(2);elements.add(3);ExecutorService executor = Executors.newFixedThreadPool(2);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {@Overrideprotected boolean whenNext(Integer element) {System.out.println("处理元素: " + element);if (element == 2) {throw new RuntimeException("模拟处理元素2时出错");}return true;}@Overrideprotected void whenError(Throwable error) {System.err.println("发生错误: " + error.getMessage());executor.shutdown();}});}private static void testLargeDataSet() {Set<Integer> elements = new HashSet<>();for (int i = 1; i <= 1000; i++) {elements.add(i);}ExecutorService executor = Executors.newFixedThreadPool(10);AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);long startTime = System.currentTimeMillis();publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newFixedThreadPool(4)) {private int count = 0;@Overrideprotected boolean whenNext(Integer element) {count++;if (count % 100 == 0) {System.out.println("已处理 " + count + " 个元素");}return true;}@Overrideprotected void whenComplete() {long duration = System.currentTimeMillis() - startTime;System.out.println("处理完成,共处理 " + count + " 个元素,耗时 " + duration + " 毫秒");executor.shutdown();}});}
}