当前位置: 首页 > ds >正文

响应式编程框架Reactor【3】

文章目录

  • 六、错误处理与背压
    • 6.1 错误处理操作符
      • 6.1.1 onErrorReturn - 发生错误时返回默认值
      • 6.1.2 onErrorResume - 发生错误时切换到备用流
      • 6.1.3 onErrorContinue
      • 6.1.4 retry - 重试操作
      • 6.1.5 timeout - 操作超时处理
      • 6.1.6 retryWhen
      • 6.1.7 doOnError
      • 6.1.8 错误处理流程与策略
      • 6.1.9 最佳实践
      • 6.1.10 总结
    • 6.2 背压处理
      • 6.2.1 背压的核心原理
      • 6.2.2 Reactor 中的背压策略
      • 6.2.3 热流与冷流的背压差异
      • 6.2.4 背压监控与调度
      • 6.2.5 背压最佳实践
      • 6.2.6 limitRate
      • 6.2.7 limitRate + 背压策略
      • 6.2.8 与类似操作符的区别
      • 6.2.9 最佳实践
      • 6.2.10 总结

六、错误处理与背压

6.1 错误处理操作符

package cn.tcmeta;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;import java.time.Duration;public class ErrorHandlingExamples {public void errorHandlingOperators() {Flux<Integer> numbers = Flux.range(1, 5).map(n -> {if (n == 3) throw new RuntimeException("Boom on 3!");return n;});// onErrorReturn - 发生错误时返回默认值Flux<Integer> withDefault = numbers.onErrorReturn(0);// onErrorResume - 发生错误时切换到备用流Flux<Integer> withFallback = numbers.onErrorResume(e ->Flux.just(9, 8, 7));// onErrorContinue - 发生错误时继续处理(跳过错误元素)Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->System.out.println("Error with value " + value + ": " + e.getMessage()));// retry - 重试操作Flux<Integer> withRetry = numbers.retry(2); // 最多重试2次// retryWhen - 基于条件的重试Flux<Integer> withConditionalRetry = numbers.retryWhen(Retry.withThrowable(retries -> retries.zipWith(Flux.range(1, 3),(error, index) -> {if (index < 3) {return Duration.ofSeconds(index); // 指数退避} else {throw new RuntimeException("Retries exhausted", error);}})));// timeout - 操作超时处理Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1)).timeout(Duration.ofMillis(500)).onErrorResume(e -> Flux.just(-1));}public void errorHandlingInPractice() {// 模拟外部服务调用Mono<String> externalServiceCall = Mono.fromCallable(() -> {if (Math.random() > 0.7) {throw new RuntimeException("Service unavailable");}return "Service response";});// 添加弹性模式Mono<String> resilientCall = externalServiceCall.timeout(Duration.ofSeconds(2)).retryWhen(Retry.backoff(3, Duration.ofMillis(100)).doBeforeRetry(e -> Mono.just("Fallback response")));resilientCall.subscribe(System.out::println,error -> System.err.println("Unexpected error: " + error),() -> System.out.println("Completed"));}
}

在 Reactor 响应式编程中,错误处理是确保流稳定性的核心环节。与传统编程的异常处理不同,Reactor 的错误具有终止性:一旦流中发生错误(通过onError信号),当前流会立即终止,后续元素不再发射。

错误处理操作符的作用是捕获错误、恢复流、转换错误或重试,避免错误直接传递到订阅者导致整个流程中断。

Reactor 错误处理操作符主要解决四类问题:

  1. 错误恢复:发生错误时,用默认值或备用流继续处理
  2. 错误转换:将原始错误转换为更有意义的业务错误
  3. 错误重试:对临时错误(如网络波动)进行重试
  4. 错误通知:仅记录错误不中断流(需配合恢复机制)

6.1.1 onErrorReturn - 发生错误时返回默认值

public final Flux<T> onErrorReturn(T fallbackValue) {Objects.requireNonNull(fallbackValue, "fallbackValue must not be null");return onAssembly(new FluxOnErrorReturn<>(this, null, fallbackValue));
}

这类操作符在捕获错误后,会生成新的元素或流,使整个流能够正常完成(而非错误终止)。

  • 作用:发生错误时,返回一个预设的默认值,流以该值结束并正常完成。
  • 特点:简单直接,适合已知错误且有明确默认值的场景。
public class OnErrorReturnExample {public static void main(String[] args) {// 模拟一个可能出错的流(第3个元素出错)Flux<Integer> errorProneFlux = Flux.range(1, 5).map(num -> {if (num == 3) {throw new RuntimeException("处理数字3时出错");}return num;});// 使用onErrorReturn:出错时返回默认值0errorProneFlux.onErrorReturn(0).subscribe(num -> System.out.println("接收元素: " + num),error -> System.err.println("未被触发的错误处理"), // 不会执行() -> System.out.println("流正常完成") // 会执行);}
}
接收元素: 1
接收元素: 2
接收元素: 0  // 错误发生时返回的默认值
流正常完成   // 流正常结束,而非错误终止

6.1.2 onErrorResume - 发生错误时切换到备用流

public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {return onAssembly(new FluxOnErrorResume<>(this, fallback));
}
  • 作用:发生错误时,切换到一个备用流(Publisher),用备用流的元素继续处理。
  • 特点:比onErrorReturn更灵活,可根据错误类型返回不同的备用流。
public class OnErrorResumeExample {public static void main(String[] args) {// 模拟可能发生两种错误的流Flux<String> dataStream = Flux.just("a", "b", "c").map(data -> {if (data.equals("b")) {throw new IllegalArgumentException("无效数据: b"); // 业务错误}if (data.equals("c")) {throw new RuntimeException("系统错误"); // 系统错误}return data.toUpperCase();});// 使用onErrorResume:根据错误类型返回不同备用流dataStream.onErrorResume(error -> {// 对业务错误返回特定备用流if (error instanceof IllegalArgumentException) {return Flux.just("B(备用)", "B1(备用)");}// 对系统错误返回默认备用流else {return Mono.just("系统错误备用值");}}).subscribe(result -> System.out.println("接收结果: " + result),error -> System.err.println("未被触发的错误处理"),() -> System.out.println("流正常完成"));}
}
接收结果: A
接收结果: B(备用)
接收结果: B1(备用)
流正常完成

6.1.3 onErrorContinue

  • 发生错误时继续处理(跳过错误元素)
public final Flux<T> onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) {BiConsumer<Throwable, Object> genericConsumer = errorConsumer;return contextWriteSkippingContextPropagation(Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,OnNextFailureStrategy.resume(genericConsumer)));
}
// onErrorContinue - 发生错误时继续处理(跳过错误元素)
Flux<Integer> withContinue = numbers.onErrorContinue((e, value) ->System.out.println("Error with value " + value + ": " + e.getMessage())
);

6.1.4 retry - 重试操作

public final Flux<T> retry(long numRetries) {return onAssembly(new FluxRetry<>(this, numRetries));
}
  • 作用:发生错误时,重新订阅原始流,最多重试numRetries次。
  • 特点:简单重试,不区分错误类型,适合已知偶发错误的场景。
public class RetryExample {public static void main(String[] args) {AtomicInteger attemptCount = new AtomicInteger(0); // 记录尝试次数// 模拟可能临时失败的操作(前2次失败,第3次成功)Flux<String> operation = Flux.defer(() -> {int attempt = attemptCount.incrementAndGet();System.out.println("执行第" + attempt + "次尝试");if (attempt < 3) {throw new RuntimeException("第" + attempt + "次尝试失败(临时错误)");}return Flux.just("第" + attempt + "次尝试成功");});// 使用retry(2):最多重试2次(总尝试次数=1+2=3)operation.retry(2) // 允许重试2次.subscribe(result -> System.out.println("成功: " + result),error -> System.err.println("最终失败: " + error.getMessage()));}
}
执行第1次尝试
执行第2次尝试(重试1)
执行第3次尝试(重试2)
成功:3次尝试成功

6.1.5 timeout - 操作超时处理

public final Flux<T> timeout(Duration timeout) {return timeout(timeout, null, Schedulers.parallel());
}
Flux<Integer> withTimeout = numbers.delayElements(Duration.ofSeconds(1)).timeout(Duration.ofMillis(500)) // timeout - 操作超时处理.onErrorResume(e -> Flux.just(-1));

6.1.6 retryWhen

public final Flux<T> retryWhen(Retry retrySpec) {return onAssembly(new FluxRetryWhen<>(this, retrySpec));
}
  • 作用:基于错误信号流(Flux<Throwable>)动态控制重试策略(如指数退避、条件重试)。
  • 特点:高度灵活,支持复杂重试逻辑(如根据错误类型决定是否重试、设置重试间隔)。
public class RetryWhenExample {public static void main(String[] args) throws InterruptedException {AtomicInteger attemptCount = new AtomicInteger(0);// 模拟网络请求(前3次失败,第4次成功)Flux<String> networkCall = Flux.defer(() -> {int attempt = attemptCount.incrementAndGet();System.out.println("执行第" + attempt + "次网络请求");if (attempt < 4) {throw new RuntimeException("网络超时");}return Flux.just("请求成功,数据: {id:1}");});// 定义指数退避重试策略:最多重试3次,间隔依次为1s、2s、4sRetry retryStrategy = Retry.backoff(3, Duration.ofSeconds(1)).jitter(0.1) // 增加10%随机抖动,避免重试风暴.filter(error -> error instanceof RuntimeException) // 只对特定错误重试.onRetryExhaustedThrow((retrySpec, signal) -> new RuntimeException("重试耗尽,最终失败", signal.failure()));// 使用retryWhen应用策略networkCall.retryWhen(retryStrategy).subscribe(result -> System.out.println("接收结果: " + result),error -> System.err.println("最终错误: " + error.getMessage()));// 等待所有重试完成(总耗时≈1+2+4=7秒)Thread.sleep(8000);}
}

输出结果(间隔随重试次数指数增长):

执行第1次网络请求
执行第2次网络请求(1秒后重试)
执行第3次网络请求(2秒后重试)
执行第4次网络请求(4秒后重试)
接收结果: 请求成功,数据: {id:1}

6.1.7 doOnError

  • 作用:错误发生时执行副作用(如日志记录、报警),但不处理错误本身。
  • 特点:流仍会以错误终止,需与onErrorResume等配合使用。
public class DoOnErrorExample {private static final Logger log = LoggerFactory.getLogger(DoOnErrorExample.class);public static void main(String[] args) {Flux<String> dataStream = Flux.just("x", "y").map(data -> {if (data.equals("y")) {throw new RuntimeException("处理y时出错");}return data;});// doOnError仅记录日志,不处理错误;需配合onErrorReturn恢复流dataStream.doOnError(error -> {// 记录错误日志(副作用)log.error("捕获错误: {}", error.getMessage(), error);}).onErrorReturn("y的默认值") // 恢复流.subscribe(result -> System.out.println("接收: " + result),error -> System.err.println("不会执行,因为已恢复"));}
}
接收: x
[ERROR] 捕获错误: 处理y时出错
java.lang.RuntimeException: 处理y时出错...
接收: y的默认值

6.1.8 错误处理流程与策略

错误传播与处理流程

flowchart LRA[正常流] -->|发生错误| B[触发onError信号]B --> C{是否有错误处理操作符?}C -->|否| D[错误传递到订阅者,流终止]C -->|是| E[执行错误处理逻辑]E -->|恢复流(如onErrorReturn)| F[流继续并正常完成]E -->|未恢复(如onErrorMap)| G[错误转换后传递,流终止]E -->|重试(如retry)| H[重新订阅原始流]

错误处理策略选择

场景推荐操作符原因
已知错误,需返回默认值onErrorReturn简单直接,适合静态默认值
错误类型多样,需动态备用流onErrorResume可根据错误类型返回不同备用流
需统一错误类型(如业务异常)onErrorMap转换错误便于下游统一处理
临时错误(如网络波动)retry/retryWhen重试可避免偶发错误导致失败
需记录错误但不中断流doOnError + 恢复操作符doOnError仅记录,恢复操作符保证流继续

6.1.9 最佳实践

  1. 错误处理尽早原则
    在流的上游处理错误,避免错误传递到下游多个订阅者重复处理。
  2. 区分可重试与不可重试错误
    • 可重试:网络超时、服务暂时不可用(用retryWhen+ 过滤)
    • 不可重试:业务错误(如参数无效)、权限不足(直接处理不重试)
  3. 清理资源
    错误发生时需释放资源(如数据库连接),可配合doFinally
Flux.using(() -> openConnection(), // 资源创建conn -> processData(conn), // 资源使用conn -> closeConnection(conn) // 资源释放(无论成功/失败)
).onErrorResume(...)
  1. 避免静默失败
    即使使用onErrorReturn,也建议用doOnError记录错误,便于排查问题。

6.1.10 总结

Reactor 的错误处理操作符提供了从简单到复杂的完整解决方案:

  • 简单恢复用onErrorReturn,动态恢复用onErrorResume
  • 错误转换用onErrorMap,便于业务统一处理;
  • 临时错误重试用retry(简单)或retryWhen(复杂策略);
  • 错误通知用doOnError,需配合恢复操作符使用。

选择合适的操作符需结合业务场景(错误类型、是否可恢复、是否需重试),核心目标是确保流的稳定性,同时保留错误上下文便于排查

6.2 背压处理

示例代码

import reactor.core.publisher.Flux;
import reactor.core.publisher.BaseSubscriber;
import org.reactivestreams.Subscription;
import java.time.Duration;public class BackpressureExamples {public void backpressureStrategies() {// 创建一个快速生产的FluxFlux<Integer> fastProducer = Flux.range(1, 1000).delayElements(Duration.ofMillis(10));// 策略1: 缓冲 (BUFFER) - 默认策略fastProducer.onBackpressureBuffer(50); // 指定缓冲区大小// 策略2: 丢弃最新值 (DROP)fastProducer.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped));// 策略3: 丢弃最旧值 (LATEST)fastProducer.onBackpressureLatest();// 策略4: 错误 (ERROR)fastProducer.onBackpressureError();}// 自定义处理背压public void customBackpressure() {Flux.range(1, 1000).subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int BATCH_SIZE = 10;@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 初始请求request(BATCH_SIZE);}@Overrideprotected void hookOnNext(Integer value) {// 处理元素process(value);count++;// 每处理BATCH_SIZE个元素后请求下一批if (count % BATCH_SIZE == 0) {request(BATCH_SIZE);}}@Overrideprotected void hookOnComplete() {System.out.println("Completed!");}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable.getMessage());}private void process(Integer value) {// 模拟处理try {Thread.sleep(20);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Processed: " + value);}});}
}

在 Reactor 响应式编程中,背压(Backpressure) 是解决 “生产者速度远快于消费者” 问题的核心机制。它允许消费者主动告知生产者自己的处理能力,通过动态调节数据请求量,避免数据积压导致的内存溢出或系统崩溃。Reactor 作为 Reactive Streams 规范的优秀实现,提供了完善的背压处理方案,涵盖从基础机制到高级策略的全场景支持。

6.2.1 背压的核心原理

1. 背压的本质

背压是一种流量控制机制:消费者通过request(n)方法告知生产者 " 我现在能处理n个元素 “,生产者根据该请求量调整数据发送速度。这种” 消费者主导 “的模式,与传统” 生产者推送 " 模式形成鲜明对比。

2. Reactor 中的背压交互模型

Reactor 通过Subscription接口实现背压交互,核心流程如下:

  • 消费者订阅生产者后,生产者创建Subscription对象并通过onSubscribe传递给消费者;
  • 消费者调用subscription.request(n),表示 " 请求n个元素 ";
  • 生产者收到请求后,发送不超过n个元素(通过onNext);
  • 消费者处理完部分元素后,再次调用request(m)请求更多元素,形成闭环。
生产者Subscription消费者订阅(subscribe)调用onSubscribe(Subscription)request(2) // 初始请求2个元素转发请求onNext(元素1)onNext(元素2)处理元素1request(1) // 再请求1个onNext(元素3)处理元素2request(1) // 再请求1个onNext(元素4)...持续到流结束...生产者Subscription消费者

6.2.2 Reactor 中的背压策略

当生产者速度超过消费者处理能力(即未处理元素数量超过消费者请求量)时,Reactor 提供了多种策略来处理超额数据,核心通过onBackpressureXXX系列操作符实现。

1. 缓冲策略:onBackpressureBuffer()

  • 原理:将超额数据存入缓冲区暂存,直到消费者请求新数据时再发送。
  • 默认行为:Reactor 中大多数操作符默认使用缓冲区(如Flux.range),默认缓冲区大小为Queues.SMALL_BUFFER_SIZE(256 个元素)。
  • 适用场景:生产者速度偶尔超过消费者,且内存资源充足(可容忍短期缓冲)。
public class BackpressureBufferExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成100个元素(每10ms一个)Flux<Integer> fastProducer = Flux.range(1, 100).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(10)); // 快速生产// 消费者:处理速度慢(每100ms处理一个)fastProducer// 缓冲超额数据(默认大小256,这里显式指定以强调策略).onBackpressureBuffer(100, // 缓冲区大小dropped -> System.out.println("缓冲区满,丢弃: " + dropped), // 缓冲区满时的回调false // 满时是否抛出异常(false=丢弃)).publishOn(Schedulers.boundedElastic()) // 切换到消费者线程.doOnNext(num -> {try {// 模拟慢速处理Thread.sleep(100);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();// 等待所有处理完成Thread.sleep(15000);}
}

关键输出(缓冲区暂存超额数据):

生产: 1 → 生产: 2... → 生产: 10(快速连续生产)
处理: 1100ms后)
处理: 2(再100ms后)
...(生产者持续生产,缓冲区暂存,消费者按自己速度处理)
  1. 丢弃策略:onBackpressureDrop()
  • 原理:当缓冲区满时,直接丢弃新产生的元素(不存入缓冲区)。
  • 扩展能力:可通过回调记录丢弃的元素,便于监控数据丢失情况。
  • 适用场景:数据实时性要求高,允许丢失旧数据(如实时监控指标,可丢弃过期数据)。
public class BackpressureDropExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成20个元素Flux<Integer> fastProducer = Flux.range(1, 20).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(50)); // 50ms/个// 消费者:每200ms处理1个(速度仅为生产者的1/4)fastProducer// 缓冲区满时丢弃新元素,并记录丢弃的元素.onBackpressureDrop(dropped -> System.out.println("丢弃元素: " + dropped)).publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(200);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(5000);}
}

关键输出(超过处理能力的元素被丢弃):

生产: 1 → 生产: 2 → 生产: 3 → 生产: 4 → 生产: 5(快速生产)
处理: 1200ms后)
生产: 6
丢弃元素: 6(缓冲区满,新元素被丢弃)
生产: 7
丢弃元素: 7
处理: 2(再200ms后)
...

3. 错误策略:onBackpressureError()

  • 原理:当缓冲区满时,立即抛出BufferOverflowException,终止流的处理。
  • 适用场景:数据不允许丢失,且希望快速发现流量不匹配问题(如关键业务数据处理)。
public class BackpressureErrorExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成10个元素Flux<Integer> fastProducer = Flux.range(1, 10).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(50));// 消费者:处理速度慢,且不允许数据丢失fastProducer// 缓冲区满时抛出错误.onBackpressureError().publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(200);System.out.println("处理: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe(null,error -> System.err.println("背压错误: " + error.getMessage()) // 捕获缓冲区溢出错误);Thread.sleep(3000);}
}

关键输出(缓冲区满时立即报错):

生产: 1 → 生产: 2... → 生产: 6(默认缓冲区满)
处理: 1
背压错误: Buffer overflow

4. 保留最新策略:onBackpressureLatest()

  • 原理:只保留最新产生的元素,当消费者请求时,直接发送最新元素(覆盖旧元素)。
  • 适用场景:只需要最新数据,旧数据无意义(如实时股价、用户当前位置)。
public class BackpressureLatestExample {public static void main(String[] args) throws InterruptedException {// 生产者:快速生成1-10的数字(模拟实时数据更新)Flux<Integer> fastProducer = Flux.range(1, 10).doOnNext(num -> System.out.println("生产: " + num)).delayElements(Duration.ofMillis(100));// 消费者:每500ms处理一次(只关心最新数据)fastProducer// 只保留最新元素.onBackpressureLatest().publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(500);System.out.println("处理(最新): " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(6000);}
}

关键输出(只处理最新元素,跳过中间值):

生产: 12345500ms内产生5个元素)
处理(最新): 5(只保留最新的5)
生产: 678910(下一个500ms)
处理(最新): 10

6.2.3 热流与冷流的背压差异

Reactor 中的流分为冷流(Cold Stream)热流(Hot Stream),它们的背压处理方式存在本质区别:

  1. 冷流(如Flux.range、数据库查询)
  • 特点:为每个订阅者单独生成数据(“一对一”),数据生成与订阅强关联。
  • 背压支持:天然支持背压,生产者可根据消费者的request(n)动态调节数据生成速度(如按需从数据库拉取数据)。
  1. 热流(如Flux.interval、WebSocket 消息)
  • 特点:数据生成独立于订阅(“一对多”),无论是否有订阅者都持续产生数据。
  • 背压挑战:热流生产者无法感知单个消费者的处理能力,超额数据必须通过缓冲、丢弃等策略处理(如onBackpressureDrop)。

示例:热流的背压处理

// 热流:每100ms生成一个元素(独立于订阅)
Flux<Long> hotStream = Flux.interval(Duration.ofMillis(100)).share(); // 转为热流(多订阅者共享)// 消费者1:处理快(100ms/个)
hotStream.subscribe(num -> System.out.println("消费者1处理: " + num));// 消费者2:处理慢(500ms/个),需用背压策略
hotStream.onBackpressureDrop(dropped -> System.out.println("消费者2丢弃: " + dropped)).subscribe(num -> {try {Thread.sleep(500);System.out.println("消费者2处理: " + num);} catch (InterruptedException e) { /* 忽略 */ }});

6.2.4 背压监控与调度

Reactor 提供了工具监控背压状态,帮助排查流量不匹配问题:

1. metrics()操作符:暴露流的关键指标(如元素数量、背压请求量)。

Flux.range(1, 100).metrics() // 启用指标收集.onBackpressureBuffer().subscribe();

2. doOnRequest监控请求量:跟踪消费者的request(n)调用。

Flux.range(1, 10).doOnRequest(n -> System.out.println("收到请求: " + n)).subscribe(num -> System.out.println("处理: " + num));
  1. SubmissionPublisherestimatePending():估算未处理元素数量。
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber());
publisher.submit(1);
System.out.println("未处理元素: " + publisher.estimatePending()); // 输出未处理数量

6.2.5 背压最佳实践

  1. 根据业务场景选择策略

    • 不允许丢数据 → onBackpressureBuffer(需评估内存);
    • 实时性优先 → onBackpressureLatestonBackpressureDrop
    • 严格流量控制 → onBackpressureError(快速失败)。
  2. 控制缓冲区大小:默认缓冲区(256)可能不适合高并发场景,需根据内存和吞吐量调整

.onBackpressureBuffer(1024) // 调整缓冲区为1024个元素
  1. 避免在消费者中执行阻塞操作:阻塞会导致背压请求延迟,间接引发数据积压(应使用publishOn切换到专用线程池)。

  2. 热流必须显式处理背压:热流无法天然响应背压,需通过onBackpressureXXX明确策略,否则可能丢失数据而不报错。

  3. 结合限流操作符:对于突发流量,可先用limitRate(n)限制生产者速度,减轻背压压力

6.2.6 limitRate

在 Reactor 中,limitRate(n)是一个用于控制生产者发送速率的操作符,它通过限制每次请求的元素数量,间接平衡生产者和消费者的速度,是【背压机制的重要补充】。

onBackpressureXXX系列操作符(消费者侧处理)不同,limitRate生产者侧主动限制流量,避免一次性发送过多元素导致消费者处理压力过大。

核心作用

limitRate的本质是拆分请求:当消费者调用request(m)请求m个元素时,limitRate会将这个大请求拆分为多个小请求,每次向上游生产者请求n个元素(nlimitRate的参数),从而控制元素流入下游的速度。

  • 默认行为:如果不使用limitRate,消费者的一次request(1000)会直接传递给上游,可能导致上游一次性发送 1000 个元素,引发下游处理压力。
  • limitRate效果:若设置limitRate(100),则request(1000)会被拆分为 10 次request(100),上游每次最多发送 100 个元素,下游分批处理。

两个重载方法

  • limitRate(int prefetch):单参数版本,prefetch为每次向上游请求的元素数量(默认预取阈值为prefetch/2)。
  • limitRate(int prefetch, int lowTide):双参数版本,prefetch为每次请求量,lowTide为触发新请求的阈值(当剩余元素少于lowTide时,自动请求下一批)。
下游消费者limitRate操作符上游生产者request(500) // 消费者请求500个元素limitRate(100):每次向上游请求100个request(100) // 第一次请求100个发送100个元素转发100个元素处理元素,剩余元素逐渐减少剩余元素<50(lowTide=100/2)request(100) // 自动请求下一批发送100个元素转发100个元素重复直到满足500个元素请求下游消费者limitRate操作符上游生产者

示例说明

  1. limitRate的情况(可能会导致元素突发)
// 上游:快速生成1000个元素
Flux.range(1, 1000).doOnRequest(n -> System.out.println("上游收到请求: " + n)).subscribe(num -> { /* 处理元素 */ },error -> {},() -> System.out.println("处理完成"));

输出(一次性请求所有元素):

上游收到请求: 9223372036854775807  // 默认请求Long.MAX_VALUE
处理完成
  1. 使用limitRate的情况(分批请求)
public class LimitRateExample {public static void main(String[] args) {// 上游:生成1000个元素Flux<Integer> upstream = Flux.range(1, 1000).doOnRequest(n -> System.out.println("上游收到请求: " + n)); // 监控上游请求// 使用limitRate(100):每次向上游请求100个元素upstream.limitRate(100).subscribe(num -> {// 模拟消费者处理(不打印所有元素,只关注请求逻辑)},error -> {},() -> System.out.println("处理完成"));}
}

输出(分批请求,每次 100 个):

上游收到请求: 100  // 第一次请求
上游收到请求: 100  // 当剩余元素少于50(100/2)时,自动请求下一批
上游收到请求: 100
...(重复直到1000个元素)
处理完成
  1. limitRate适用场景

  2. 上游生产者无法感知下游背压:如某些第三方库或 legacy 系统,需通过limitRate主动限制发送速度。

  3. 避免大批次元素处理导致的 GC 压力:分批处理可减少内存占用,降低垃圾回收频率。

  4. 平滑流量波动:在突发流量场景下,limitRate可将集中请求拆分为平稳的小批次请求。

  5. 与热流配合:热流(如Flux.interval)无法响应背压,limitRate可控制其流入下游的速度。

6.2.7 limitRate + 背压策略

limitRate通常与背压策略配合使用,形成 “上游限流 + 下游缓冲” 的双层保护:

  • limitRate控制上游发送速度,避免一次性涌入过多元素;
  • onBackpressureBuffer在下游缓冲少量超额元素,应对短期速度波动。
public class LimitRateWithBackpressure {public static void main(String[] args) throws InterruptedException {// 上游:快速生成元素(每10ms一个)Flux<Integer> fastUpstream = Flux.range(1, 200).doOnRequest(n -> System.out.println("上游请求: " + n)).delayElements(Duration.ofMillis(10));// 下游:处理较慢(每100ms一个)fastUpstream.limitRate(20) // 每次向上游请求20个,控制流入速度.onBackpressureBuffer(30) // 下游缓冲30个,应对短期波动.publishOn(Schedulers.boundedElastic()).doOnNext(num -> {try {Thread.sleep(100); // 模拟慢速处理System.out.println("处理元素: " + num);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).subscribe();Thread.sleep(20000);}
}

6.2.8 与类似操作符的区别

操作符核心作用适用场景
limitRate(n)拆分请求,控制上游发送速率(每次请求n个)平滑流量,避免大批次元素
limitRequest(n)限制下游总请求量(最多请求n个元素)仅需前n个元素的场景
onBackpressureBuffer(n)下游缓冲超额元素(最多缓冲n个)处理短期速度不匹配
throttleFirst(Duration)单位时间内只保留第一个元素限流高频事件(如点击)

6.2.9 最佳实践

  1. 合理设置prefetch
    • 过小(如 10):请求次数过多,增加开销;
    • 过大(如 1000):可能失去限流效果;
    • 建议:根据下游处理能力设置(如下游每秒处理 100 个,则prefetch设为 100-200)。
  2. 双参数版本更灵活:当元素处理时间不稳定时,用limitRate(prefetch, lowTide)调整触发新请求的阈值:
.limitRate(200, 50) // 每次请求200个,剩余50个时触发下一次请求
  1. publishOn配合使用publishOn会请求一批元素到自己的缓冲区,limitRate应放在publishOn上游,避免缓冲区积压:
// 正确:limitRate在上游,控制流入publishOn缓冲区的速度
upstream.limitRate(100).publishOn(...)// 错误:limitRate在下游,无法控制publishOn的缓冲区
upstream.publishOn(...).limitRate(100)

6.2.10 总结

Reactor 的背压机制通过 “消费者主动请求” 模式,解决了生产者与消费者的速度不匹配问题,其核心价值在于:

  • 流量可控:避免消费者被压垮,保障系统稳定性;
  • 策略灵活:提供缓冲、丢弃、错误、保留最新等策略,适配不同业务场景;
  • 兼容性:严格遵循 Reactive Streams 规范,可与其他响应式库(如 RxJava)协同工作。

在实践中,需根据流的类型(冷 / 热)和业务需求(数据重要性、实时性)选择合适的背压策略,同时通过监控工具持续优化流量控制,确保系统在高并发场景下的可靠性。

http://www.xdnf.cn/news/19251.html

相关文章:

  • 【物联网】关于 GATT (Generic Attribute Profile)基本概念与三种操作(Read / Write / Notify)的理解
  • OpenAI Sora深度解析:AI视频生成技术如何重塑广告电商行业?影业合作已落地
  • WebGIS开发智慧校园(8)地图控件
  • 【实时Linux实战系列】实时自动化测试框架
  • [vmware][ubuntu]一个linux调用摄像头截图demo
  • 常见视频封装格式对比
  • LeetCode 317 离建筑物最近的距离
  • 科技赋能医疗:陪诊小程序系统开发,让就医不再孤单
  • mysql中表的约束
  • weblogic JBoss漏洞 Strcts2漏洞 fastjson漏洞
  • 计算机视觉第一课opencv(四)保姆级教学
  • solidity地址、智能合约、交易概念
  • 【完整源码+数据集+部署教程】高速公路施工区域物体检测系统源码和数据集:改进yolo11-RepNCSPELAN
  • FOC-双电阻采样-无刷-AC/DC(吹风筒项目)
  • 笔记本电脑频繁出现 vcomp140.dll丢失怎么办?结合移动设备特性,提供适配性强的修复方案
  • 函数的逆与原象
  • flutter-使用url_launcher打开链接/应用/短信/邮件和评分跳转等
  • LoraConfig target modules加入embed_tokens(64)
  • Java项目打包成EXE全攻略
  • Spring Boot 项目文件上传安全与优化:OSS、MinIO、Nginx 分片上传实战
  • 用 C++ 创建单向链表 forward list
  • “我店 + RWA”来袭:重构商业价值,解锁消费投资新密码
  • HarmonyOS权限管理应用
  • 【序列晋升】20 Spring Cloud Function 函数即服务(FaaS)
  • FPGA实现1553B BC控制器IP方案
  • LeetCode259~282题解
  • 吴恩达机器学习作业五:神经网络正向传播
  • 【前端教程】从性别统计类推年龄功能——表单交互与数据处理进阶
  • 【前端教程】从零开始学JavaScript交互:7个经典事件处理案例解析
  • C++Primer笔记——第六章:函数(下)