当前位置: 首页 > ops >正文

响应式编程框架Reactor【8】

文章目录

  • 十三、性能优化与最佳实践
  • 十四、Reactor工作原理与流程图
    • 14.1 Reactor执行流程
    • 14.2 Reactor背压机制
  • 十五、Reactor Context API详解
    • 15.1 Context的设计目的
    • 15.2 Context 与 ThreadLocal 的区别
    • 15.3 Context基本用法
      • 15.3.1 创建和访问 Context
      • 15.3.2 在反应式流当中使用Context
    • 15.4 Context高级用法
      • 15.4.1 在操作符之间传递Context
      • 15.4.2 Context 与错误处理
      • 15.4.3 嵌套 Context 处理
    • 15.5 Context在实际应用中的作用
      • 15.5.1 Web应用中的请求上下文
      • 15.5.2 分布式追踪与日志
      • 15.5.3 权限验证与安全上下文
    • 15.6 Context 工作原理与流程图
      • 15.6.1 Context 在反应式流中的传递
      • 15.6.2 Context 的读取顺序
    • 15.7 最佳实践与注意事项
      • 15.7.1 合理使用 Context
      • 15.7.2 Context 键命名规范
      • 16.7.3 错误处理
    • 15.8 注意事项
      • 15.8.1 Context是不可变的
      • 15.8.2 Context 与线程安全
      • 15.8.3 性能考虑
    • 15.8 总结
  • 十六、总结与实践经验
    • 16.1 Reactor核心优势
    • 16.2 实践经验总结
    • 16.3 适用场景

十三、性能优化与最佳实践

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;public class PerformanceBestPractices {// 1. 合理使用调度器public Flux<String> ioIntensiveOperation(Flux<String> inputs) {return inputs.flatMap(input -> Mono.fromCallable(() -> blockingIoOperation(input)).subscribeOn(Schedulers.boundedElastic()) // IO操作使用弹性线程池);}private String blockingIoOperation(String input) {// 模拟阻塞IO操作try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return input.toUpperCase();}// 2. 避免在响应式链中进行阻塞操作public Flux<String> nonBlockingAlternative(Flux<String> inputs) {return inputs.delayElements(Duration.ofMillis(100)) // 使用延迟而不是睡眠.map(String::toUpperCase);}// 3. 合理使用缓存public Mono<String> getWithCache(String key) {return Mono.fromCallable(() -> expensiveOperation(key)).cache(Duration.ofMinutes(5)) // 缓存5分钟.subscribeOn(Schedulers.parallel());}private String expensiveOperation(String key) {// 模拟昂贵操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "Processed: " + key;}// 4. 背压感知处理public Flux<Data> processWithBackpressureAwareness(Flux<Data> dataStream) {AtomicInteger counter = new AtomicInteger();return dataStream.onBackpressureBuffer(1000, data -> System.out.println("Dropping: " + data)).doOnNext(data -> {if (counter.incrementAndGet() % 100 == 0) {System.out.println("Processed " + counter.get() + " items");}}).subscribeOn(Schedulers.parallel());}// 5. 合理使用flatMap与concatMappublic Flux<Result> processWithConcurrencyControl(Flux<Request> requests) {return requests.flatMap(request -> processRequest(request).subscribeOn(Schedulers.parallel()),10 // 最大并发数);}private Mono<Result> processRequest(Request request) {return Mono.fromCallable(() -> {// 处理请求return new Result("Processed: " + request.getId());});}// 6. 监控与指标收集public Flux<Data> processWithMetrics(Flux<Data> dataStream) {return dataStream.name("dataProcessing") // 为操作命名以便监控.metrics() // 收集指标.doOnNext(data -> {// 业务逻辑}).doOnError(error -> {// 错误处理与记录});}
}// 简单的数据模型
class Data {private String content;// getters and setters
}class Request {private String id;// getters and setters
}class Result {private String message;// constructor, getters
}

在这里插入图片描述

十四、Reactor工作原理与流程图

14.1 Reactor执行流程

SubscriberPublisher (Flux/Mono)OperatorsSchedulersubscribe()创建操作链安排执行(如果需要)在指定线程执行onSubscribe(Subscription)request(n)请求数据onNext(data)应用转换/过滤onNext(processedData)request(m) (更多数据)onComplete() (数据完成)onComplete()错误处理路径onError(throwable)onError(throwable)SubscriberPublisher (Flux/Mono)OperatorsScheduler

14.2 Reactor背压机制

不能
BUFFER
DROP
LATEST
ERROR
Publisher 产生数据
Subscriber 能否处理?
发送数据 onNext
更新请求计数
背压策略
缓冲数据
丢弃数据
丢弃最旧, 保持最新
发出错误信号
缓冲区满?
应用溢出策略
还有请求额度?
等待新请求

十五、Reactor Context API详解

Reactor 的 Context API 提供了一种在反应式流中传递上下文信息的机制,类似于传统编程中的 ThreadLocal,但专门为反应式编程设计。它允许在流的处理过程中传递和访问上下文数据,而不会破坏反应式流的不可变性和链式特性。

15.1 Context的设计目的

  1. 跨操作符传递数据:在反应式链的不同操作符之间传递上下文信息
  2. 避免方法参数污染:不需要将上下文数据作为参数在每个方法间传递
  3. 线程安全:适用于反应式编程中线程切换的场景
  4. 与订阅相关:每个订阅都有自己独立的 Context

15.2 Context 与 ThreadLocal 的区别

上下文传递机制
ThreadLocal
Reactor Context
基于线程存储
同步编程适用
线程切换时问题
基于订阅存储
反应式编程适用
线程安全

15.3 Context基本用法

15.3.1 创建和访问 Context

package cn.tcmeta.context;import reactor.util.context.Context;/*** @author: laoren* @description: Context基本示例* @version: 1.0.0*/
public class ContextBasicExample {public static void main(String[] args) {// 1. 创建Context对象Context context = Context.of("user", "jack", "requestID", "1234");// 2. 读取Context对象当中的值String username = context.get("user");var requestID = context.get("requestID");System.out.println("username: " + username);System.out.println("requestID: " + requestID);// 3. 使用【getOrDefault】安全获取值String hasUser = context.getOrDefault("userRole", "defaultRole");System.out.println("hasUser: " + hasUser);// 4. 检查是否存在boolean hasUserRole = context.hasKey("userRole");System.out.println("hasUserRole: " + hasUserRole);// 5. 创建新的Context对象【Context是不可变的】Context newContext = context.put("department", "工程师");System.out.println("department: " + newContext.get("department"));// 6. 删除键Context withoutRequestId = newContext.delete("requestId");System.out.println("withoutRequestId: " + withoutRequestId.get("requestId"));}
}

在这里插入图片描述

15.3.2 在反应式流当中使用Context

1. 使用 contextWrite 将 Context 注入流中

// 1. 使用 contextWrite 将 Context 注入流中
Mono<String> result = Mono.just("Hello").flatMap(value ->Mono.deferContextual(ctx -> {String user = ctx.get("user");return Mono.just(value + " " + user);})).contextWrite(Context.of("user", "Alice"));result.subscribe(System.out::println); // 输出: Hello Alice

2. 多层Context写入

// 多层 Context 写入
Mono<String> multiLayer = Mono.just("Message").flatMap(value ->Mono.deferContextual(ctx -> {String user = ctx.get("user");String role = ctx.get("role");return Mono.just(value + " for " + user + " (" + role + ")");})).contextWrite(Context.of("role", "admin")) // 第二层.contextWrite(Context.of("user", "Bob"));   // 第一层(最外层)multiLayer.subscribe(System.out::println); // 输出: Message for Bob (admin)

3. Context 的读取顺序是从内到外

Mono<String> orderExample = Mono.deferContextual(ctx -> {// 这里会读取到最内层的 userString user = ctx.get("user");return Mono.just("User: " + user);
}).contextWrite(Context.of("user", "inner"))  // 内层.contextWrite(Context.of("user", "outer")); // 外层(会被内层覆盖)orderExample.subscribe(System.out::println); // 输出: User: inner

15.4 Context高级用法

15.4.1 在操作符之间传递Context

package cn.tcmeta.context;import reactor.core.publisher.Mono;
import reactor.util.context.Context;/*** @author: laoren* @date: 2025/8/27 19:46* @description: 在操作符之间传递 Context* @version: 1.0.0*/
public class ContextAcrossOperators {static class UserService {Mono<String> getUserName(String userId) {return Mono.deferContextual(ctx -> {String requestId = ctx.getOrDefault("requestId", "unknow");System.out.println("requestId: " + requestId + " - userId: " + userId);return Mono.just("User_" + userId);});}}static class OrderService {Mono<String> getOrderInfo(String orderId) {return Mono.deferContextual(ctx -> {String requestId = ctx.getOrDefault("requestId", "unknow");String user = ctx.getOrDefault("user", "unknow");System.out.println("requestId: " + requestId + " - user: " + user + " - orderId: " + orderId);return Mono.just("Order_" + orderId + " _for_ " + user);});}}public static void main(String[] args) {UserService userService = new UserService();OrderService orderService = new OrderService();// 在流处理过程中传递 ContextMono<String> result = Mono.just("123").flatMap(userService::getUserName).flatMap(userName ->Mono.deferContextual(ctx -> {// 将用户信息存入 Contextreturn Mono.just("order456").flatMap(orderService::getOrderInfo).contextWrite(Context.of("user", userName));})).contextWrite(Context.of("requestId", "req-789"));result.subscribe(System.out::println);// 输出:// Getting user 123 with requestId: req-789// Getting order order456 for User_123 with requestId: req-789// Order_order456_for_User_123}
}

15.4.2 Context 与错误处理

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ContextErrorHandling {public static void main(String[] args) {// 在错误处理中访问 ContextMono<String> result = Mono.error(new RuntimeException("Something went wrong")).onErrorResume(e -> Mono.deferContextual(ctx -> {String requestId = ctx.getOrDefault("requestId", "unknown");String errorMessage = "Error in request " + requestId + ": " + e.getMessage();return Mono.just(errorMessage);})).contextWrite(Context.of("requestId", "req-123"));result.subscribe(System.out::println); // 输出: Error in request req-123: Something went wrong// 在 doOnError 中访问 ContextMono<String> withDoOnError = Mono.just("data").flatMap(d -> Mono.error(new RuntimeException("Processing error"))).doOnError(e -> Mono.deferContextual(ctx -> {String requestId = ctx.get("requestId");System.err.println("Error occurred for request: " + requestId);return Mono.empty();}).subscribe() // 注意:需要订阅这个内部流).onErrorResume(e -> Mono.just("fallback")).contextWrite(Context.of("requestId", "req-456"));withDoOnError.subscribe(System.out::println);// 输出: // Error occurred for request: req-456// fallback}
}

15.4.3 嵌套 Context 处理

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class NestedContextExample {public static void main(String[] args) {// 模拟外部服务调用Mono<String> externalServiceCall = Mono.deferContextual(ctx -> {String authToken = ctx.get("authToken");String requestId = ctx.get("requestId");System.out.println("Calling service with token: " + authToken + ", requestId: " + requestId);return Mono.just("Service response");});// 嵌套 Context 使用Mono<String> result = Mono.just("process").flatMap(operation -> Mono.deferContextual(outerCtx -> {String userId = outerCtx.get("userId");// 创建内部 Context(添加认证令牌)Context innerContext = outerCtx.put("authToken", "token-for-" + userId);return externalServiceCall.contextWrite(innerContext).flatMap(serviceResponse -> Mono.deferContextual(innerCtx -> {String requestId = innerCtx.get("requestId");return Mono.just("Processed: " + serviceResponse + " for user " + userId + " (request: " + requestId + ")");}));})).contextWrite(Context.of("userId", "alice", "requestId", "req-123"));result.subscribe(System.out::println);// 输出:// Calling service with token: token-for-alice, requestId: req-123// Processed: Service response for user alice (request: req-123)}
}

15.5 Context在实际应用中的作用

15.5.1 Web应用中的请求上下文

import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.web.server.ServerWebExchange;public class WebContextExample {// 模拟的 Web 处理器static class WebHandler {Mono<String> handleRequest(ServerWebExchange exchange) {return Mono.just("request-data").flatMap(data -> processData(data)).contextWrite(Context.of("exchange", exchange,"requestId", exchange.getRequest().getId(),"user", getCurrentUser(exchange)));}Mono<String> processData(String data) {return Mono.deferContextual(ctx -> {ServerWebExchange exchange = ctx.get("exchange");String requestId = ctx.get("requestId");String user = ctx.get("user");System.out.println("Processing data for user: " + user + ", requestId: " + requestId);return Mono.just("Processed: " + data);});}String getCurrentUser(ServerWebExchange exchange) {// 模拟从 exchange 中获取用户信息return "user-alice";}}// 模拟的 ServerWebExchangestatic class MockServerWebExchange {private MockServerRequest request = new MockServerRequest();public MockServerRequest getRequest() {return request;}}static class MockServerRequest {public String getId() {return "req-12345";}}public static void main(String[] args) {WebHandler handler = new WebHandler();MockServerWebExchange exchange = new MockServerWebExchange();handler.handleRequest(exchange).subscribe(System.out::println);// 输出: // Processing data for user: user-alice, requestId: req-12345// Processed: request-data}
}

15.5.2 分布式追踪与日志

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.UUID;public class TracingContextExample {// 模拟的分布式追踪器static class Tracer {void logEvent(String event, String requestId) {System.out.println("[" + requestId + "] " + event);}Mono<String> startSpan(String name, String requestId) {return Mono.fromCallable(() -> {String spanId = UUID.randomUUID().toString();logEvent("Started span: " + name + " (" + spanId + ")", requestId);return spanId;});}Mono<Void> endSpan(String spanId, String requestId) {return Mono.fromRunnable(() -> logEvent("Ended span: " + spanId, requestId));}}public static void main(String[] args) {Tracer tracer = new Tracer();String requestId = UUID.randomUUID().toString();Flux.range(1, 3).flatMap(i -> Mono.deferContextual(ctx -> {String currentRequestId = ctx.get("requestId");return tracer.startSpan("process-item-" + i, currentRequestId).flatMap(spanId -> processItem(i).doOnSuccess(result -> tracer.endSpan(spanId, currentRequestId).subscribe()).doOnError(error -> tracer.endSpan(spanId, currentRequestId).subscribe()));})).contextWrite(Context.of("requestId", requestId)).subscribe();// 输出示例:// [a1b2c3d4...] Started span: process-item-1 (e5f6g7h8...)// [a1b2c3d4...] Ended span: e5f6g7h8...// [a1b2c3d4...] Started span: process-item-2 (i9j0k1l2...)// [a1b2c3d4...] Ended span: i9j0k1l2...// [a1b2c3d4...] Started span: process-item-3 (m3n4o5p6...)// [a1b2c3d4...] Ended span: m3n4o5p6...}static Mono<String> processItem(int item) {return Mono.just("Processed item " + item).delayElement(java.time.Duration.ofMillis(100));}
}

15.5.3 权限验证与安全上下文

import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class SecurityContextExample {// 模拟的用户认证信息static class Authentication {private final String username;private final String[] roles;public Authentication(String username, String[] roles) {this.username = username;this.roles = roles;}public String getUsername() {return username;}public String[] getRoles() {return roles;}public boolean hasRole(String role) {for (String r : roles) {if (r.equals(role)) {return true;}}return false;}}// 模拟的资源服务static class ResourceService {Mono<String> getProtectedResource(String resourceId) {return Mono.deferContextual(ctx -> {Authentication auth = ctx.get("auth");if (!auth.hasRole("ADMIN")) {return Mono.error(new SecurityException("Access denied"));}return Mono.just("Protected resource: " + resourceId);});}}public static void main(String[] args) {ResourceService service = new ResourceService();// 有权限的用户Authentication adminAuth = new Authentication("alice", new String[]{"USER", "ADMIN"});service.getProtectedResource("secret-data").contextWrite(Context.of("auth", adminAuth)).subscribe(System.out::println,error -> System.err.println("Error: " + error.getMessage()));// 输出: Protected resource: secret-data// 无权限的用户Authentication userAuth = new Authentication("bob", new String[]{"USER"});service.getProtectedResource("secret-data").contextWrite(Context.of("auth", userAuth)).subscribe(System.out::println,error -> System.err.println("Error: " + error.getMessage()));// 输出: Error: Access denied}
}

15.6 Context 工作原理与流程图

15.6.1 Context 在反应式流中的传递

SubscriberOperator 1Operator 2Operator 3subscribe()向下游传递向下游传递contextWrite(ContextA)处理数据 + ContextA处理数据 + ContextA最终结果contextWrite(ContextB)处理数据 + ContextB最终结果SubscriberOperator 1Operator 2Operator 3

15.6.2 Context 的读取顺序

读取 Context 值
当前操作符有 Context?
读取当前 Context
向上游查找 Context
找到 Context?
返回找到的值
使用默认值或抛出异常
返回找到的值

15.7 最佳实践与注意事项

15.7.1 合理使用 Context

// 好的做法:将相关的上下文信息放在 Context 中
Context context = Context.of("requestId", "req-123","userId", "user-alice","traceId", "trace-456"
);// 不好的做法:将不相关或过多的数据放在 Context 中
Context badContext = Context.of("requestId", "req-123","unrelatedData", largeObject, // 大对象不适合放在 Context 中"config", configMap // 配置信息应该通过其他方式传递
);

15.7.2 Context 键命名规范

// 使用有意义的键名,避免冲突
public static final String KEY_REQUEST_ID = "com.example.requestId";
public static final String KEY_USER = "com.example.user";Context context = Context.of(KEY_REQUEST_ID, "req-123", KEY_USER, "alice");

16.7.3 错误处理

Mono.deferContextual(ctx -> {// 使用 getOrDefault 避免 NoSuchElementExceptionString value = ctx.getOrDefault("key", "default");// 或者使用 hasKey 检查if (ctx.hasKey("key")) {return Mono.just(ctx.get("key"));} else {return Mono.error(new IllegalArgumentException("Missing key"));}
});

15.8 注意事项

15.8.1 Context是不可变的

Context original = Context.of("key", "value");
Context updated = original.put("newKey", "newValue"); // 返回新对象System.out.println(original.hasKey("newKey")); // false
System.out.println(updated.hasKey("newKey"));   // true

15.8.2 Context 与线程安全

// Context 是线程安全的,可以在多线程环境中使用
Mono.fromCallable(() -> {// 在另一个线程中访问 Contextreturn "result";}).flatMap(result -> Mono.deferContextual(ctx -> {String requestId = ctx.get("requestId"); // 安全访问return Mono.just(result + " for " + requestId);})).contextWrite(Context.of("requestId", "req-123")).subscribeOn(Schedulers.parallel()) // 切换到并行调度器.subscribe();

15.8.3 性能考虑

// 避免在热点路径中频繁创建大的 Context
Flux.range(1, 1000).contextWrite(Context.of("data", largeData)) // 大的数据对象.subscribe(); // 这会在每个元素上创建 Context 副本// 更好的做法:将大数据存储在外部,Context 中只存储引用
Flux.range(1, 1000).contextWrite(Context.of("dataRef", dataReference)).subscribe();

15.8 总结

Reactor 的 Context API 提供了在反应式流中传递上下文信息的强大机制。通过本文的详细讲解,我们可以总结出以下几点:

  1. 核心特性
    • 基于订阅的上下文存储
    • 不可变的数据结构
    • 线程安全的访问机制
    • 支持多层上下文嵌套
  2. 适用场景
    • 请求范围的数据传递(如请求ID、用户信息)
    • 分布式追踪和日志记录
    • 权限验证和安全上下文
    • 跨操作符的配置传递
  3. 最佳实践
    • 合理选择存储在 Context 中的数据
    • 使用有意义的键名避免冲突
    • 使用 getOrDefault 或 hasKey 安全访问数据
    • 避免在 Context 中存储大对象
  4. 注意事项
    • Context 是不可变的,每次修改都会创建新对象
    • Context 的读取顺序是从内到外
    • 需要考虑性能影响,避免过度使用

Context API 是 Reactor 框架中处理上下文传递的标准方式,掌握了它的使用能够更好地构建可维护、可追踪的反应式应用程序。

十六、总结与实践经验

16.1 Reactor核心优势

  1. 高效的资源利用:通过非阻塞IO和合理的线程模型,实现高并发处理
  2. 强大的背压处理:内置多种背压策略,防止系统过载
  3. 丰富的操作符:提供函数式、声明式的数据处理方式
  4. 与Spring生态完美集成:Spring WebFlux、Spring Data R2DBC等
  5. 良好的测试支持:StepVerifier等工具简化测试编写

16.2 实践经验总结

  1. 线程模型选择
    • CPU密集型任务使用 Schedulers.parallel()
    • IO密集型任务使用 Schedulers.boundedElastic()
    • 避免在响应式链中阻塞线程
  2. 背压策略选择
    • 实时性要求高:onBackpressureDrop
    • 数据完整性重要:onBackpressureBuffer
    • 平衡实时与完整:onBackpressureLatest
  3. 错误处理原则
    • 使用 onErrorResume 提供降级方案
    • 使用 retryWhen 实现智能重试
    • 记录错误但保持流继续运行
  4. 性能优化技巧
    • 合理使用 cache 避免重复计算
    • 使用 windowbuffer 进行批量处理
    • 控制 flatMap 的并发度
  5. 调试与监控
    • 使用 .name().metrics() 进行监控
    • 使用 Hooks.onOperatorDebug() 调试复杂流
    • 利用 StepVerifier 进行全面测试

16.3 适用场景

  1. 高并发Web应用:特别是微服务架构中的API网关、聚合服务
  2. 实时数据处理:聊天应用、实时通知、实时分析
  3. 流式ETL管道:数据转换、 enrichment、聚合
  4. 响应式数据库访问:使用R2DBC进行非阻塞数据库操作
  5. 消息驱动架构:与Kafka、RabbitMQ等消息中间件集成

Reactor 框架为Java开发者提供了强大的响应式编程能力,通过合理运用其丰富的操作符和灵活的线程模型,可以构建出高性能、高弹性的现代应用程序。

http://www.xdnf.cn/news/19494.html

相关文章:

  • Notepad++近期版本避雷
  • 中心扩展算法
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘tox’问题
  • 利用 DrissionPage 精准获取淘宝商品描述:Python 爬虫实战指南
  • C/C++、Python和Java语言的比较
  • 【职业】算法与数据结构专题
  • 15693协议ICODE SLI 系列标签应用场景说明及读、写、密钥认证操作Qt c++源码,支持统信、麒麟等国产Linux系统
  • 浪潮科技Java开发面试题及参考答案(120道题-上)
  • 利用本地电脑上的MobaXterm连接虚拟机上的Ubuntu
  • 基于SpringBoot音乐翻唱平台
  • Linux Shell 脚本中括号类型及用途
  • three.js+WebGL踩坑经验合集(10.2):镜像问题又一坑——THREE.InstancedMesh的正反面向光问题
  • UART-TCP双向桥接服务
  • 【51单片机三路抢答器定时器1工作1外部中断1】2022-11-24
  • 参数检验vs非参数检验
  • docker 网络配置
  • 【高级】系统架构师 | 2025年上半年综合真题
  • 硬件开发_基于Zigee组网的果园养殖监控系统
  • 56_基于深度学习的X光安检危险物品检测系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • aws上创建jenkins
  • 力扣 23 912题(堆)
  • JAVA 面试宝典02
  • 工业飞拍技术:高速生产线的 “动态抓拍神器”,到底牛在哪?
  • 20250829的学习笔记
  • 基于GCN图神经网络的光伏功率预测Matlab代码
  • Spark实现推荐系统中的相似度算法
  • Proteus 仿真 + STM32CubeMX 协同开发全教程:从配置到仿真一步到位
  • 盟接之桥说制造:守正出奇:在能力圈内稳健前行,以需求导向赢得市场
  • 基于51单片机220V交流电流检测系统过流阈值报警设计
  • 增强现实—Gated-attention architectures for task-oriented language grounding