响应式编程框架Reactor【3】
文章目录
- 六、错误处理与背压
- 6.1 错误处理操作符
- 6.1.1 onErrorReturn - 发生错误时返回默认值
- 6.1.2 onErrorResume - 发生错误时切换到备用流
- 6.1.3 onErrorContinue
- 6.1.4 retry - 重试操作
- 6.1.5 timeout - 操作超时处理
- 6.1.6 retryWhen
- 6.1.7 doOnError
- 6.1.8 错误处理流程与策略
- 6.1.9 最佳实践
- 6.1.10 总结
- 6.2 背压处理
- 6.2.1 背压的核心原理
- 6.2.2 Reactor 中的背压策略
- 6.2.3 热流与冷流的背压差异
- 6.2.4 背压监控与调度
- 6.2.5 背压最佳实践
- 6.2.6 limitRate
- 6.2.7 limitRate + 背压策略
- 6.2.8 与类似操作符的区别
- 6.2.9 最佳实践
- 6.2.10 总结
六、错误处理与背压
6.1 错误处理操作符
package cn.tcmeta;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;import java.time.Duration;public class ErrorHandlingExamples {public void errorHandlingOperators() {Flux<Integer> numbers = Flux.range(1, 5).map(n -> {if (n == 3) throw new RuntimeException("Boom on 3!");return n;});// onErrorReturn - 发生错误时返回默认值Flux<Integer> withDefault = numbers.onErrorReturn(0);// onErrorResume - 发生错误时切换到备用流Flux<Integer> withFallback = numbers.onErrorResume(e ->Flux.just(9, 8, 7));// onErrorContinue - 发生错误时继续处理(跳过错误元素)Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->System.out.println("Error with value " + value + ": " + e.getMessage()));// retry - 重试操作Flux<Integer> withRetry = numbers.retry(2); // 最多重试2次// retryWhen - 基于条件的重试Flux<Integer> withConditionalRetry = numbers.retryWhen(Retry.withThrowable(retries -> retries.zipWith(Flux.range(1, 3),(error, index) -> {if (index < 3) {return Duration.ofSeconds(index); // 指数退避} else {throw new RuntimeException("Retries exhausted", error);}})));// timeout - 操作超时处理Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1)).timeout(Duration.ofMillis(500)).onErrorResume(e -> Flux.just(-1));}public void errorHandlingInPractice() {// 模拟外部服务调用Mono<String> externalServiceCall = Mono.fromCallable(() -> {if (Math.random() > 0.7) {throw new RuntimeException("Service unavailable");}return "Service response";});// 添加弹性模式Mono<String> resilientCall = externalServiceCall.timeout(Duration.ofSeconds(2)).retryWhen(Retry.backoff(3, Duration.ofMillis(100)).doBeforeRetry(e -> Mono.just("Fallback response")));resilientCall.subscribe(System.out::println,error -> System.err.println("Unexpected error: " + error),() -> System.out.println("Completed"));}
}
在 Reactor 响应式编程中,错误处理是确保流稳定性的核心环节。与传统编程的异常处理不同,Reactor 的错误具有终止性:一旦流中发生错误(通过onError
信号),当前流会立即终止,后续元素不再发射。
错误处理操作符的作用是捕获错误、恢复流、转换错误或重试,避免错误直接传递到订阅者导致整个流程中断。
Reactor 错误处理操作符主要解决四类问题:
- 错误恢复:发生错误时,用默认值或备用流继续处理
- 错误转换:将原始错误转换为更有意义的业务错误
- 错误重试:对临时错误(如网络波动)进行重试
- 错误通知:仅记录错误不中断流(需配合恢复机制)
6.1.1 onErrorReturn - 发生错误时返回默认值
public final Flux<T> onErrorReturn(T fallbackValue) {Objects.requireNonNull(fallbackValue, "fallbackValue must not be null");return onAssembly(new FluxOnErrorReturn<>(this, null, fallbackValue));
}
这类操作符在捕获错误后,会生成新的元素或流,使整个流能够正常完成(而非错误终止)。
- 作用:发生错误时,返回一个预设的默认值,流以该值结束并正常完成。
- 特点:简单直接,适合已知错误且有明确默认值的场景。
public class OnErrorReturnExample {public static void main(String[] args) {// 模拟一个可能出错的流(第3个元素出错)Flux<Integer> errorProneFlux = Flux.range(1, 5).map(num -> {if (num == 3) {throw new RuntimeException("处理数字3时出错");}return num;});// 使用onErrorReturn:出错时返回默认值0errorProneFlux.onErrorReturn(0).subscribe(num -> System.out.println("接收元素: " + num),error -> System.err.println("未被触发的错误处理"), // 不会执行() -> System.out.println("流正常完成") // 会执行);}
}
接收元素: 1
接收元素: 2
接收元素: 0 // 错误发生时返回的默认值
流正常完成 // 流正常结束,而非错误终止
6.1.2 onErrorResume - 发生错误时切换到备用流
public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {return onAssembly(new FluxOnErrorResume<>(this, fallback));
}
- 作用:发生错误时,切换到一个备用流(
Publisher
),用备用流的元素继续处理。 - 特点:比
onErrorReturn
更灵活,可根据错误类型返回不同的备用流。
public class OnErrorResumeExample {public static void main(String[] args) {// 模拟可能发生两种错误的流Flux<String> dataStream = Flux.just("a", "b", "c").map(data -> {if (data.equals("b")) {throw new IllegalArgumentException("无效数据: b"); // 业务错误}if (data.equals("c")) {throw new RuntimeException("系统错误"); // 系统错误}return data.toUpperCase();});// 使用onErrorResume:根据错误类型返回不同备用流dataStream.onErrorResume(error -> {// 对业务错误返回特定备用流if (error instanceof IllegalArgumentException) {return Flux.just("B(备用)", "B1(备用)");}// 对系统错误返回默认备用流else {return Mono.just("系统错误备用值");}}).subscribe(result -> System.out.println("接收结果: " + result),error -> System.err.println("未被触发的错误处理"),() -> System.out.println("流正常完成"));}
}
接收结果: A
接收结果: B(备用)
接收结果: B1(备用)
流正常完成
6.1.3 onErrorContinue
- 发生错误时继续处理(跳过错误元素)
public final Flux<T> onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) {BiConsumer<Throwable, Object> genericConsumer = errorConsumer;return contextWriteSkippingContextPropagation(Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,OnNextFailureStrategy.resume(genericConsumer)));
}
// onErrorContinue - 发生错误时继续处理(跳过错误元素)
Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->System.out.println("Error with value " + value + ": " + e.getMessage())
);
6.1.4 retry - 重试操作
public final Flux<T> retry(long numRetries) {return onAssembly(new FluxRetry<>(this, numRetries));
}
- 作用:发生错误时,重新订阅原始流,最多重试
numRetries
次。 - 特点:简单重试,不区分错误类型,适合已知偶发错误的场景。
public class RetryExample {public static void main(String[] args) {AtomicInteger attemptCount = new AtomicInteger(0); // 记录尝试次数// 模拟可能临时失败的操作(前2次失败,第3次成功)Flux<String> operation = Flux.defer(() -> {int attempt = attemptCount.incrementAndGet();System.out.println("执行第" + attempt + "次尝试");if (attempt < 3) {throw new RuntimeException("第" + attempt + "次尝试失败(临时错误)");}return Flux.just("第" + attempt + "次尝试成功");});// 使用retry(2):最多重试2次(总尝试次数=1+2=3)operation.retry(2) // 允许重试2次.subscribe(result -> System.out.println("成功: " + result),error -> System.err.println("最终失败: " + error.getMessage()));}
}
执行第1次尝试
执行第2次尝试(重试1)
执行第3次尝试(重试2)
成功: 第3次尝试成功
6.1.5 timeout - 操作超时处理
public final Flux<T> timeout(Duration timeout) {return timeout(timeout, null, Schedulers.parallel());
}
Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1)).timeout(Duration.ofMillis(500)) // timeout - 操作超时处理.onErrorResume(e -> Flux.just(-1));
6.1.6 retryWhen
public final Flux<T> retryWhen(Retry retrySpec) {return onAssembly(new FluxRetryWhen<>(this, retrySpec));
}
- 作用:基于错误信号流(
Flux<Throwable>
)动态控制重试策略(如指数退避、条件重试)。 - 特点:高度灵活,支持复杂重试逻辑(如根据错误类型决定是否重试、设置重试间隔)。
public class RetryWhenExample {public static void main(String[] args) throws InterruptedException {AtomicInteger attemptCount = new AtomicInteger(0);// 模拟网络请求(前3次失败,第4次成功)Flux<String> networkCall = Flux.defer(() -> {int attempt = attemptCount.incrementAndGet();System.out.println("执行第" + attempt + "次网络请求");if (attempt < 4) {throw new RuntimeException("网络超时");}return Flux.just("请求成功,数据: {id:1}");});// 定义指数退避重试策略:最多重试3次,间隔依次为1s、2s、4sRetry retryStrategy = Retry.backoff(3, Duration.ofSeconds(1)).jitter(0.1) // 增加10%随机抖动,避免重试风暴.filter(error -> error instanceof RuntimeException) // 只对特定错误重试.onRetryExhaustedThrow((retrySpec, signal) -> new RuntimeException("重试耗尽,最终失败", signal.failure()));// 使用retryWhen应用策略networkCall.retryWhen(retryStrategy).subscribe(result -> System.out.println("接收结果: " + result),error -> System.err.println("最终错误: " + error.getMessage()));// 等待所有重试完成(总耗时≈1+2+4=7秒)Thread.sleep(8000);}
}
输出结果(间隔随重试次数指数增长):
执行第1次网络请求
执行第2次网络请求(1秒后重试)
执行第3次网络请求(2秒后重试)
执行第4次网络请求(4秒后重试)
接收结果: 请求成功,数据: {id:1}
6.1.7 doOnError
- 作用:错误发生时执行副作用(如日志记录、报警),但不处理错误本身。
- 特点:流仍会以错误终止,需与
onErrorResume
等配合使用。
public class DoOnErrorExample {private static final Logger log = LoggerFactory.getLogger(DoOnErrorExample.class);public static void main(String[] args) {Flux<String> dataStream = Flux.just("x", "y").map(data -> {if (data.equals("y")) {throw new RuntimeException("处理y时出错");}return data;});// doOnError仅记录日志,不处理错误;需配合onErrorReturn恢复流dataStream.doOnError(error -> {// 记录错误日志(副作用)log.error("捕获错误: {}", error.getMessage(), error);}).onErrorReturn("y的默认值") // 恢复流.subscribe(result -> System.out.println("接收: " + result),error -> System.err.println("不会执行,因为已恢复"));}
}
接收: x
[ERROR] 捕获错误: 处理y时出错
java.lang.RuntimeException: 处理y时出错...
接收: y的默认值
6.1.8 错误处理流程与策略
错误传播与处理流程
flowchart LRA[正常流] -->|发生错误| B[触发onError信号]B --> C{是否有错误处理操作符?}C -->|否| D[错误传递到订阅者,流终止]C -->|是| E[执行错误处理逻辑]E -->|恢复流(如onErrorReturn)| F[流继续并正常完成]E -->|未恢复(如onErrorMap)| G[错误转换后传递,流终止]E -->|重试(如retry)| H[重新订阅原始流]
错误处理策略选择
场景 | 推荐操作符 | 原因 |
---|---|---|
已知错误,需返回默认值 | onErrorReturn | 简单直接,适合静态默认值 |
错误类型多样,需动态备用流 | onErrorResume | 可根据错误类型返回不同备用流 |
需统一错误类型(如业务异常) | onErrorMap | 转换错误便于下游统一处理 |
临时错误(如网络波动) | retry /retryWhen | 重试可避免偶发错误导致失败 |
需记录错误但不中断流 | doOnError + 恢复操作符 | doOnError 仅记录,恢复操作符保证流继续 |
6.1.9 最佳实践
- 错误处理尽早原则
在流的上游处理错误,避免错误传递到下游多个订阅者重复处理。 - 区分可重试与不可重试错误
- 可重试:网络超时、服务暂时不可用(用
retryWhen
+ 过滤) - 不可重试:业务错误(如参数无效)、权限不足(直接处理不重试)
- 可重试:网络超时、服务暂时不可用(用
- 清理资源
错误发生时需释放资源(如数据库连接),可配合doFinally
:
Flux.using(() -> openConnection(), // 资源创建conn -> processData(conn), // 资源使用conn -> closeConnection(conn) // 资源释放(无论成功/失败)
).onErrorResume(...)
- 避免静默失败
即使使用onErrorReturn
,也建议用doOnError
记录错误,便于排查问题。
6.1.10 总结
Reactor 的错误处理操作符提供了从简单到复杂的完整解决方案:
- 简单恢复用
onErrorReturn
,动态恢复用onErrorResume
; - 错误转换用
onErrorMap
,便于业务统一处理; - 临时错误重试用
retry
(简单)或retryWhen
(复杂策略); - 错误通知用
doOnError
,需配合恢复操作符使用。
选择合适的操作符需结合业务场景(错误类型、是否可恢复、是否需重试),核心目标是确保流的稳定性,同时保留错误上下文便于排查。
6.2 背压处理
示例代码
import reactor.core.publisher.Flux;
import reactor.core.publisher.BaseSubscriber;
import org.reactivestreams.Subscription;
import java.time.Duration;public class BackpressureExamples {public void backpressureStrategies() {// 创建一个快速生产的FluxFlux<Integer> fastProducer = Flux.range(1, 1000).delayElements(Duration.ofMillis(10));// 策略1: 缓冲 (BUFFER) - 默认策略fastProducer.onBackpressureBuffer(50); // 指定缓冲区大小// 策略2: 丢弃最新值 (DROP)fastProducer.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped));// 策略3: 丢弃最旧值 (LATEST)fastProducer.onBackpressureLatest();// 策略4: 错误 (ERROR)fastProducer.onBackpressureError();}// 自定义处理背压public void customBackpressure() {Flux.range(1, 1000).subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int BATCH_SIZE = 10;@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 初始请求request(BATCH_SIZE);}@Overrideprotected void hookOnNext(Integer value) {// 处理元素process(value);count++;// 每处理BATCH_SIZE个元素后请求下一批if (count % BATCH_SIZE == 0) {request(BATCH_SIZE);}}@Overrideprotected void hookOnComplete() {System.out.println("Completed!");}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable.getMessage());}private void process(Integer value) {// 模拟处理try {Thread.sleep(20);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Processed: " + value);}});}
}
在 Reactor 响应式编程中,背压(Backpressure) 是解决 “生产者速度远快于消费者” 问题的核心机制。它允许消费者主动告知生产者自己的处理能力,通过动态调节数据请求量,避免数据积压导致的内存溢出或系统崩溃。Reactor 作为 Reactive Streams 规范的优秀实现,提供了完善的背压处理方案,涵盖从基础机制到高级策略的全场景支持。
6.2.1 背压的核心原理
1. 背压的本质
背压是一种流量控制机制:消费者通过request(n)
方法告知生产者 " 我现在能处理n
个元素 “,生产者根据该请求量调整数据发送速度。这种” 消费者主导 “的模式,与传统” 生产者推送 " 模式形成鲜明对比。
2. Reactor 中的背压交互模型
Reactor 通过Subscription
接口实现背压交互,核心流程如下:
- 消费者订阅生产者后,生产者创建
Subscription
对象并通过onSubscribe
传递给消费者; - 消费者调用
subscription.request(n)
,表示 " 请求n
个元素 "; - 生产者收到请求后,发送不超过
n
个元素(通过onNext
); - 消费者处理完部分元素后,再次调用
request(m)
请求更多元素,形成闭环。
6.2.2 Reactor 中的背压策略
当生产者速度超过消费者处理能力(即未处理元素数量超过消费者请求量)时,Reactor 提供了多种策略来处理超额数据,核心通过onBackpressureXXX
系列操作符实现。
1. 缓冲策略:onBackpressureBuffer()
- 原理:将超额数据存入缓冲区暂存,直到消费者请求新数据时再发送。
- 默认行为:Reactor 中大多数操作符默认使用缓冲区(如
Flux.range
),默认缓冲区大小为Queues.SMALL_BUFFER_SIZE
(256 个元素)。 - 适用场景:生产者速度偶尔超过消费者,且内存资源充足(可容忍短期缓冲)。
public class BackpressureBufferExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成100个元素(每10ms一个)Flux<Integer> fastProducer = Flux.range(1, 100).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(10)); // 快速生产// 消费者:处理速度慢(每100ms处理一个)fastProducer// 缓冲超额数据(默认大小256,这里显式指定以强调策略).onBackpressureBuffer(100, // 缓冲区大小dropped -> System.out.println("缓冲区满,丢弃: " + dropped), // 缓冲区满时的回调false // 满时是否抛出异常(false=丢弃)).publishOn(Schedulers.boundedElastic()) // 切换到消费者线程.doOnNext(num -> {try {// 模拟慢速处理Thread.sleep(100);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();// 等待所有处理完成Thread.sleep(15000);}
}
关键输出(缓冲区暂存超额数据):
生产: 1 → 生产: 2 → ... → 生产: 10(快速连续生产)
处理: 1(100ms后)
处理: 2(再100ms后)
...(生产者持续生产,缓冲区暂存,消费者按自己速度处理)
丢弃策略:
onBackpressureDrop()
- 原理:当缓冲区满时,直接丢弃新产生的元素(不存入缓冲区)。
- 扩展能力:可通过回调记录丢弃的元素,便于监控数据丢失情况。
- 适用场景:数据实时性要求高,允许丢失旧数据(如实时监控指标,可丢弃过期数据)。
public class BackpressureDropExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成20个元素Flux<Integer> fastProducer = Flux.range(1, 20).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(50)); // 50ms/个// 消费者:每200ms处理1个(速度仅为生产者的1/4)fastProducer// 缓冲区满时丢弃新元素,并记录丢弃的元素.onBackpressureDrop(dropped -> System.out.println("丢弃元素: " + dropped)).publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(200);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(5000);}
}
关键输出(超过处理能力的元素被丢弃):
生产: 1 → 生产: 2 → 生产: 3 → 生产: 4 → 生产: 5(快速生产)
处理: 1(200ms后)
生产: 6
丢弃元素: 6(缓冲区满,新元素被丢弃)
生产: 7
丢弃元素: 7
处理: 2(再200ms后)
...
3. 错误策略:onBackpressureError()
- 原理:当缓冲区满时,立即抛出
BufferOverflowException
,终止流的处理。 - 适用场景:数据不允许丢失,且希望快速发现流量不匹配问题(如关键业务数据处理)。
public class BackpressureErrorExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成10个元素Flux<Integer> fastProducer = Flux.range(1, 10).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(50));// 消费者:处理速度慢,且不允许数据丢失fastProducer// 缓冲区满时抛出错误.onBackpressureError().publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(200);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe(null,error -> System.err.println("背压错误: " + error.getMessage()) // 捕获缓冲区溢出错误);Thread.sleep(3000);}
}
关键输出(缓冲区满时立即报错):
生产: 1 → 生产: 2 → ... → 生产: 6(默认缓冲区满)
处理: 1
背压错误: Buffer overflow
4. 保留最新策略:
onBackpressureLatest()
- 原理:只保留最新产生的元素,当消费者请求时,直接发送最新元素(覆盖旧元素)。
- 适用场景:只需要最新数据,旧数据无意义(如实时股价、用户当前位置)。
public class BackpressureLatestExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成1-10的数字(模拟实时数据更新)Flux<Integer> fastProducer = Flux.range(1, 10).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(100));// 消费者:每500ms处理一次(只关心最新数据)fastProducer// 只保留最新元素.onBackpressureLatest().publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(500);System.out.println("处理(最新): " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(6000);}
}
关键输出(只处理最新元素,跳过中间值):
生产: 1 → 2 → 3 → 4 → 5(500ms内产生5个元素)
处理(最新): 5(只保留最新的5)
生产: 6 → 7 → 8 → 9 → 10(下一个500ms)
处理(最新): 10
6.2.3 热流与冷流的背压差异
Reactor 中的流分为冷流(Cold Stream) 和热流(Hot Stream),它们的背压处理方式存在本质区别:
- 冷流(如
Flux.range
、数据库查询)
- 特点:为每个订阅者单独生成数据(“一对一”),数据生成与订阅强关联。
- 背压支持:天然支持背压,生产者可根据消费者的
request(n)
动态调节数据生成速度(如按需从数据库拉取数据)。
- 热流(如
Flux.interval
、WebSocket 消息)
- 特点:数据生成独立于订阅(“一对多”),无论是否有订阅者都持续产生数据。
- 背压挑战:热流生产者无法感知单个消费者的处理能力,超额数据必须通过缓冲、丢弃等策略处理(如
onBackpressureDrop
)。
示例:热流的背压处理
// 热流:每100ms生成一个元素(独立于订阅)
Flux<Long> hotStream = Flux.interval(Duration.ofMillis(100)).share(); // 转为热流(多订阅者共享)// 消费者1:处理快(100ms/个)
hotStream.subscribe(num -> System.out.println("消费者1处理: " + num));// 消费者2:处理慢(500ms/个),需用背压策略
hotStream.onBackpressureDrop(dropped -> System.out.println("消费者2丢弃: " + dropped)).subscribe(num -> {try {Thread.sleep(500);System.out.println("消费者2处理: " + num);} catch (InterruptedException e) { /* 忽略 */ }});
6.2.4 背压监控与调度
Reactor 提供了工具监控背压状态,帮助排查流量不匹配问题:
1. metrics()
操作符:暴露流的关键指标(如元素数量、背压请求量)。
Flux.range(1, 100).metrics() // 启用指标收集.onBackpressureBuffer().subscribe();
2. doOnRequest
监控请求量:跟踪消费者的request(n)
调用。
Flux.range(1, 10).doOnRequest(n -> System.out.println("收到请求: " + n)).subscribe(num -> System.out.println("处理: " + num));
SubmissionPublisher
的estimatePending()
:估算未处理元素数量。
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber());
publisher.submit(1);
System.out.println("未处理元素: " + publisher.estimatePending()); // 输出未处理数量
6.2.5 背压最佳实践
-
根据业务场景选择策略:
- 不允许丢数据 →
onBackpressureBuffer
(需评估内存); - 实时性优先 →
onBackpressureLatest
或onBackpressureDrop
; - 严格流量控制 →
onBackpressureError
(快速失败)。
- 不允许丢数据 →
-
控制缓冲区大小:默认缓冲区(256)可能不适合高并发场景,需根据内存和吞吐量调整
.onBackpressureBuffer(1024) // 调整缓冲区为1024个元素
-
避免在消费者中执行阻塞操作:阻塞会导致背压请求延迟,间接引发数据积压(应使用
publishOn
切换到专用线程池)。 -
热流必须显式处理背压:热流无法天然响应背压,需通过
onBackpressureXXX
明确策略,否则可能丢失数据而不报错。 -
结合限流操作符:对于突发流量,可先用
limitRate(n)
限制生产者速度,减轻背压压力
6.2.6 limitRate
在 Reactor 中,limitRate(n)
是一个用于控制生产者发送速率的操作符,它通过限制每次请求的元素数量,间接平衡生产者和消费者的速度,是【背压机制的重要补充
】。
与onBackpressureXXX
系列操作符(消费者侧处理)不同,limitRate
从生产者侧主动限制流量,避免一次性发送过多元素导致消费者处理压力过大。
核心作用
limitRate
的本质是拆分请求:当消费者调用request(m)
请求m
个元素时,limitRate
会将这个大请求拆分为多个小请求,每次向上游生产者请求n
个元素(n
为limitRate
的参数),从而控制元素流入下游的速度。
- 默认行为:如果不使用
limitRate
,消费者的一次request(1000)
会直接传递给上游,可能导致上游一次性发送 1000 个元素,引发下游处理压力。 limitRate
效果:若设置limitRate(100)
,则request(1000)
会被拆分为 10 次request(100)
,上游每次最多发送 100 个元素,下游分批处理。
两个重载方法
limitRate(int prefetch)
:单参数版本,prefetch
为每次向上游请求的元素数量(默认预取阈值为prefetch/2
)。limitRate(int prefetch, int lowTide)
:双参数版本,prefetch
为每次请求量,lowTide
为触发新请求的阈值(当剩余元素少于lowTide
时,自动请求下一批)。
示例说明
- 无
limitRate
的情况(可能会导致元素突发)
// 上游:快速生成1000个元素
Flux.range(1, 1000).doOnRequest(n -> System.out.println("上游收到请求: " + n)).subscribe(num -> { /* 处理元素 */ },error -> {},() -> System.out.println("处理完成"));
输出(一次性请求所有元素):
上游收到请求: 9223372036854775807 // 默认请求Long.MAX_VALUE
处理完成
- 使用
limitRate
的情况(分批请求)
public class LimitRateExample {public static void main(String[] args) {// 上游:生成1000个元素Flux<Integer> upstream = Flux.range(1, 1000).doOnRequest(n -> System.out.println("上游收到请求: " + n)); // 监控上游请求// 使用limitRate(100):每次向上游请求100个元素upstream.limitRate(100).subscribe(num -> {// 模拟消费者处理(不打印所有元素,只关注请求逻辑)},error -> {},() -> System.out.println("处理完成"));}
}
输出(分批请求,每次 100 个):
上游收到请求: 100 // 第一次请求
上游收到请求: 100 // 当剩余元素少于50(100/2)时,自动请求下一批
上游收到请求: 100
...(重复直到1000个元素)
处理完成
-
limitRate适用场景
-
上游生产者无法感知下游背压:如某些第三方库或 legacy 系统,需通过
limitRate
主动限制发送速度。 -
避免大批次元素处理导致的 GC 压力:分批处理可减少内存占用,降低垃圾回收频率。
-
平滑流量波动:在突发流量场景下,
limitRate
可将集中请求拆分为平稳的小批次请求。 -
与热流配合:热流(如
Flux.interval
)无法响应背压,limitRate
可控制其流入下游的速度。
6.2.7 limitRate + 背压策略
limitRate
通常与背压策略配合使用,形成 “上游限流 + 下游缓冲” 的双层保护:
limitRate
控制上游发送速度,避免一次性涌入过多元素;onBackpressureBuffer
在下游缓冲少量超额元素,应对短期速度波动。
public class LimitRateWithBackpressure {public static void main(String[] args) throws InterruptedException {// 上游:快速生成元素(每10ms一个)Flux<Integer> fastUpstream = Flux.range(1, 200).doOnRequest(n -> System.out.println("上游请求: " + n)).delayElements(Duration.ofMillis(10));// 下游:处理较慢(每100ms一个)fastUpstream.limitRate(20) // 每次向上游请求20个,控制流入速度.onBackpressureBuffer(30) // 下游缓冲30个,应对短期波动.publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(100); // 模拟慢速处理System.out.println("处理元素: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(20000);}
}
6.2.8 与类似操作符的区别
操作符 | 核心作用 | 适用场景 |
---|---|---|
limitRate(n) | 拆分请求,控制上游发送速率(每次请求n 个) | 平滑流量,避免大批次元素 |
limitRequest(n) | 限制下游总请求量(最多请求n 个元素) | 仅需前n 个元素的场景 |
onBackpressureBuffer(n) | 下游缓冲超额元素(最多缓冲n 个) | 处理短期速度不匹配 |
throttleFirst(Duration) | 单位时间内只保留第一个元素 | 限流高频事件(如点击) |
6.2.9 最佳实践
- 合理设置
prefetch
值:- 过小(如 10):请求次数过多,增加开销;
- 过大(如 1000):可能失去限流效果;
- 建议:根据下游处理能力设置(如下游每秒处理 100 个,则
prefetch
设为 100-200)。
- 双参数版本更灵活:当元素处理时间不稳定时,用
limitRate(prefetch, lowTide)
调整触发新请求的阈值:
.limitRate(200, 50) // 每次请求200个,剩余50个时触发下一次请求
- 与
publishOn
配合使用:publishOn
会请求一批元素到自己的缓冲区,limitRate
应放在publishOn
上游,避免缓冲区积压:
// 正确:limitRate在上游,控制流入publishOn缓冲区的速度
upstream.limitRate(100).publishOn(...)// 错误:limitRate在下游,无法控制publishOn的缓冲区
upstream.publishOn(...).limitRate(100)
6.2.10 总结
Reactor 的背压机制通过 “消费者主动请求” 模式,解决了生产者与消费者的速度不匹配问题,其核心价值在于:
- 流量可控:避免消费者被压垮,保障系统稳定性;
- 策略灵活:提供缓冲、丢弃、错误、保留最新等策略,适配不同业务场景;
- 兼容性:严格遵循 Reactive Streams 规范,可与其他响应式库(如 RxJava)协同工作。
在实践中,需根据流的类型(冷 / 热)和业务需求(数据重要性、实时性)选择合适的背压策略,同时通过监控工具持续优化流量控制,确保系统在高并发场景下的可靠性。