当前位置: 首页 > news >正文

响应式编程框架Reactor【5】

文章目录

  • 八、Reactor测试
    • 8.1 核心工具与依赖
    • 8.2 依赖引入
    • 8.3 StepVerifier
      • 8.3.1 基础用法:验证正常流与完成信号
      • 8.3.2 验证错误信息号
      • 8.3.3 验证条件与断言
      • 8.3.4 验证背压行为
      • 8.3.5 验证时间依赖操作(虚拟时间)
      • 8.3.6 验证资源清理与取消
    • 8.4 测试副作用与状态变化
      • 8.3.7 验证多个元素
    • 8.5 TestPublisher:手动控制信号发射
      • 8.5.1 模拟正常数据流
      • 8.5.2 模拟错误流
      • 8.5.3 模拟背压请求
      • 8.5.4 模拟错误处理
    • 8.6 并发与线程切换测试
    • 8.7 测试最佳实践
    • 8.8 StepVerifier工作流程
  • 九、实际应用场景
    • 9.1 Web应用中的响应式处理
    • 9.2 数据库响应式访问

八、Reactor测试

在 Reactor 响应式编程中,测试是确保流行为正确性的关键环节。由于响应式流的异步性、序列性和副作用特性,传统的单元测试方法(如直接断言返回值)难以适用。Reactor 提供了专门的测试库reactor-test,通过StepVerifier等工具实现对流的声明式验证,可精准测试元素序列、完成 / 错误信号、背压行为、时间依赖及副作用等场景。

8.1 核心工具与依赖

Reactor 的测试能力主要依赖reactor-test模块,其中:

  • StepVerifier:核心类,用于声明式验证流的行为(元素序列、信号、背压等)。

  • TestPublisher:用于手动发送信号(next/error/complete),测试订阅者对各种信号的响应。

  • VirtualTimeScheduler:虚拟时间调度器,用于模拟时间流逝,测试延迟、超时等时间依赖操作。

    1. StepVerifier
      1. 作用:用于测试 PublisherMono/Flux)的输出序列。
      2. 特点:声明式、链式调用、支持虚拟时间(Virtual Time)测试延迟操作。
    1. TestPublisher<T>
      1. 作用:一个可编程的 Publisher,用于模拟外部服务或测试背压、错误等场景。

8.2 依赖引入

需在项目中添加reactor-test依赖:

<!-- Maven -->
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.5.9</version> <!-- 与Reactor核心版本一致 --><scope>test</scope>
</dependency>

8.3 StepVerifier

流行为的声明式验证

StepVerifier是 Reactor 测试的核心工具,它通过 “步骤式” 声明来验证流的完整生命周期:从订阅开始,到元素发射、信号(完成 / 错误)触发,再到背压交互。

8.3.1 基础用法:验证正常流与完成信号

场景:测试一个简单流(如Flux.range(1,3))是否按预期发射元素并正常完成。

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.junit.Test;public class StepVerifierBasicTest {@Testpublic void testNormalFlux() {// 待测试的流:发射1,2,3后完成Flux<Integer> flux = Flux.range(1, 3);// StepVerifier验证流程StepVerifier.create(flux)// 预期第一个元素为1.expectNext(1)// 预期第二个元素为2.expectNext(2)// 预期第三个元素为3.expectNext(3)// 预期流正常完成.expectComplete()// 执行验证.verify();}
}

流程分析

测试代码StepVerifier被测试Fluxcreate(flux) 绑定待测试流订阅(subscribe)发射元素1验证expectNext(1)通过发射元素2验证expectNext(2)通过发射元素3验证expectNext(3)通过发送complete信号验证expectComplete()通过verify() 执行完成,测试通过测试代码StepVerifier被测试Flux

8.3.2 验证错误信息号

场景:测试流在特定条件下是否抛出预期的错误(如RuntimeException)。

   @Testpublic void testErrorFlux() {// 待测试的流:发射1后抛出异常Flux<Integer> errorFlux = Flux.range(1, 2).map(num -> {if (num == 2) {throw new RuntimeException("模拟错误");}return num;});StepVerifier.create(errorFlux).expectNext(1) // 预期第一个元素为1// 预期抛出RuntimeException,且消息匹配.expectErrorMatches(error ->error instanceof RuntimeException &&error.getMessage().equals("模拟错误")).verify(); // 执行验证}

8.3.3 验证条件与断言

@Test
public void testExpectPredicate() {Flux<Integer> flux = Flux.just(2, 4, 6, 8);StepVerifier.create(flux).expectNextMatches(n -> n % 2 == 0)  // 验证元素满足条件.expectNextMatches(n -> n > 0).thenConsumeWhile(n -> n < 10)       // 消费并验证所有后续元素 < 10.expectComplete().verify();
}

8.3.4 验证背压行为

场景:测试流是否正确响应背压(消费者通过request(n)控制元素请求量)。

@Test
public void testBackpressure() {// 待测试的流:发射1-5Flux<Integer> flux = Flux.range(1, 5);StepVerifier.create(flux)// 模拟消费者初始请求2个元素.thenRequest(2).expectNext(1, 2) // 验证前2个元素// 再请求2个元素.thenRequest(2).expectNext(3, 4) // 验证接下来2个元素// 最后请求1个元素.thenRequest(1).expectNext(5) // 验证最后1个元素.expectComplete().verify();
}

8.3.5 验证时间依赖操作(虚拟时间)

场景:测试含延迟的流(如delayElements),使用虚拟时间避免实际等待,加速测试。

@Test
public void testDelayElements() {// 待测试的流:每个元素延迟1秒发射(共3个元素)Flux<Integer> delayedFlux = Flux.range(1, 3).delayElements(Duration.ofSeconds(1));// 使用虚拟时间调度器VirtualTimeScheduler.getOrSet();StepVerifier.withVirtualTime(() -> delayedFlux).expectSubscription() // 验证订阅成功// 向前推进3秒(覆盖所有延迟).thenAwait(Duration.ofSeconds(3)).expectNext(1, 2, 3) // 验证所有元素被发射.expectComplete().verify();
}

关键说明

  • withVirtualTime(Supplier<Publisher>):启用虚拟时间,流的时间操作(如delay)会基于虚拟时钟而非真实时间。
  • thenAwait(Duration):手动推进虚拟时间,无需实际等待,大幅提升测试速度。

真实时间超时

@Test
public void testWithTimeout() {Flux<Long> slowFlux = Flux.interval(Duration.ofSeconds(2)) // 每 2 秒发一个.take(1); // 只取一个StepVerifier.create(slowFlux, StepVerifierOptions.create().withVirtualTime()).expectNoEvent(Duration.ofSeconds(1))     // 1 秒内无事件.expectNextCount(1)                       // 之后收到 1 个.expectComplete().verify(Duration.ofSeconds(3));           // 整体超时 3 秒
}

虚拟时间优势:测试 interval, delay, timeout 等操作符时,无需真实等待,大幅提升测试速度。

8.3.6 验证资源清理与取消

@Test
public void testCancel() {Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(100));StepVerifier.create(intervalFlux).expectNextCount(2)           // 收到 2 个.thenCancel()                 // 主动取消订阅.verify();                    // 验证取消成功
}

8.4 测试副作用与状态变化

响应式流中常包含副作用操作(如doOnNextdoOnErrordoFinally),需验证这些副作用是否按预期执行(如日志记录、状态更新)

@Test
public void testSideEffects() {List<Integer> processed = new ArrayList<>(); // 记录副作用执行结果// 待测试的流:发射1-3,每次发射后执行副作用(添加到list)Flux<Integer> fluxWithSideEffect = Flux.range(1, 3).doOnNext(num -> processed.add(num)) // 副作用:记录处理的元素.doOnComplete(() -> processed.add(-1)); // 完成时添加标记StepVerifier.create(fluxWithSideEffect).expectNext(1, 2, 3).expectComplete().verify();// 验证副作用是否按预期执行assert processed.equals(List.of(1, 2, 3, -1));
}

8.3.7 验证多个元素

@Test
public void testExpectNexts() {Flux<Integer> flux = Flux.range(1, 3); // 1, 2, 3StepVerifier.create(flux).expectNext(1, 2, 3)              // 一次性验证多个元素.expectComplete().verify();// 或者使用集合StepVerifier.create(flux).expectNextSequence(List.of(1, 2, 3)).expectComplete().verify();
}

8.5 TestPublisher:手动控制信号发射

TestPublisher 用于模拟一个 Publisher,常用于单元测试中替换外部依赖(如数据库、HTTP 客户端)

TestPublisher用于手动发送信号next/error/complete),适合测试订阅者(或操作符)对异常信号序列的响应(如重复发射、提前完成等)。

8.5.1 模拟正常数据流

@Test
public void testWithTestPublisher() {// 1. 创建 TestPublisherTestPublisher<String> testPublisher = TestPublisher.create();// 2. 被测试的服务使用这个 publisherServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);// 3. 手动发送数据testPublisher.next("Data1", "Data2");testPublisher.complete();// 4. 验证服务行为(假设服务有回调或状态)// 例如:assertThat(service.getProcessedData()).contains("Data1", "Data2");
}

8.5.2 模拟错误流

@Test
public void testErrorWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();ServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);testPublisher.error(new IOException("Network failed"));// 验证服务正确处理错误// assertThat(service.hasError()).isTrue();
}

8.5.3 模拟背压请求

@Test
public void testBackpressureWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();StepVerifier.create(testPublisher.flux()).thenRequest(1)               // 主动请求 1 个.expectNext("dummy")          // TestPublisher 默认发送 dummy 值.then(() -> testPublisher.next("A")) // 发送真实数据.expectNext("A").thenRequest(1).then(() -> testPublisher.next("B")).expectNext("B").thenCancel().verify();
}

8.5.4 模拟错误处理

@Test
public void testInvalidSignalSequence() {// 创建一个严格模式的TestPublisher(不允许无效信号序列)TestPublisher<Integer> testPublisher = TestPublisher.create();// 手动发送信号:先发射元素,再发送错误,最后尝试发射元素(无效操作)testPublisher.next(1);testPublisher.error(new RuntimeException("测试错误"));testPublisher.next(2); // 错误后发射元素是无效的// 验证订阅者是否能捕获无效信号导致的错误Flux<Integer> flux = testPublisher.flux();StepVerifier.create(flux).expectNext(1).expectError(RuntimeException.class)// 验证错误消息(包含无效信号的描述).verify();
}

在这里插入图片描述

关键说明

  • TestPublisher.createStrict():严格模式,会检查信号序列的合法性(如错误 / 完成后不能再发射元素),非法操作会抛出异常。
  • TestPublisher.createNonStrict():非严格模式,允许非法信号序列,适合测试容错逻辑。

8.6 并发与线程切换测试

测试涉及线程切换(publishOn/subscribeOn)的流时,需确保操作在预期线程上执行,且并发场景下行为正确。

示例:验证线程切换后的执行线程

@Test
public void testThreadSwitch() {// 待测试的流:先在boundedElastic读取,再在parallel处理Flux<Integer> flux = Flux.range(1, 2).subscribeOn(Schedulers.boundedElastic()).doOnNext(num -> {// 验证上游操作在boundedElastic线程执行assert Thread.currentThread().getName().startsWith("boundedElastic");}).publishOn(Schedulers.parallel()).doOnNext(num -> {// 验证下游操作在parallel线程执行assert Thread.currentThread().getName().startsWith("parallel");});StepVerifier.create(flux).expectNext(1, 2).expectComplete().verify();
}

8.7 测试最佳实践

  1. 优先使用StepVerifier:它覆盖了 90% 以上的测试场景,声明式语法清晰且能验证完整流生命周期。
  2. 虚拟时间加速测试:对含delaytimeout的流,用withVirtualTime避免实际等待,测试速度提升 10 倍以上。
  3. 验证副作用:通过外部状态(如ListAtomicInteger)记录副作用,确保doOnXXX操作按预期执行。
  4. 严格模式测试边界情况:用TestPublisher的严格模式测试非法信号(如重复完成、错误后发射元素),验证流的容错性。
  5. 隔离测试:每个测试方法应独立,避免共享状态(如调度器、计数器)导致测试相互干扰。
  6. 断言具体化:使用expectErrorMatches而非expectError(),精准验证错误类型和消息;用expectNextSequence验证批量元素。

8.8 StepVerifier工作流程

expectNext
expectError
expectComplete
expectNoEvent
创建 StepVerifier
定义期望序列
期望类型
验证 onNext 信号
验证 onError 信号
验证 onComplete 信号
验证超时期内无事件
继续链式调用
调用 verify()
执行验证
通过?
测试成功
抛出 AssertionError

九、实际应用场景

9.1 Web应用中的响应式处理

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class WebApplicationExamples {// 使用WebClient进行HTTP调用public Flux<User> getUsersWithPosts() {WebClient webClient = WebClient.create("https://api.example.com");return webClient.get().uri("/users").retrieve().bodyToFlux(User.class).flatMap(user -> webClient.get().uri("/users/{id}/posts", user.getId()).retrieve().bodyToFlux(Post.class).collectList().map(posts -> {user.setPosts(posts);return user;})).timeout(Duration.ofSeconds(5)).onErrorResume(e -> {// 记录错误但继续处理其他用户System.err.println("Error fetching posts: " + e.getMessage());return Mono.just(new User()); // 返回空用户或默认值});}// 批量处理与缓冲public Flux<Result> processInBatches(Flux<Item> items) {return items.buffer(100) // 每100个元素一批.delayElements(Duration.ofMillis(500)) // 控制速率.flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.parallel()));}private Mono<Result> processBatch(List<Item> batch) {return Mono.fromCallable(() -> {// 模拟批量处理return new Result("Processed " + batch.size() + " items");});}// 实时数据流处理public Flux<Event> processRealTimeEvents(Flux<Event> eventStream) {return eventStream.window(Duration.ofSeconds(1)) // 1秒窗口.flatMap(window -> window.groupBy(Event::getType).flatMap(group -> group.reduce((e1, e2) -> new Event(e1.getType(), e1.getValue() + e2.getValue()))));}
}// 简单的数据模型
class User {private String id;private List<Post> posts;// getters and setters
}class Post {private String id;private String content;// getters and setters
}class Item {private String id;// getters and setters
}class Result {private String message;// constructor, getters
}class Event {private String type;private int value;// constructor, getters
}

9.2 数据库响应式访问

伪代码,需要引入其它的依赖项

import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class DatabaseExamples {private final R2dbcEntityTemplate template;public Flux<User> findActiveUsers() {return template.select(User.class).from("users").matching(where("active").is(true)).all().delayElements(Duration.ofMillis(10)) // 控制数据库压力.onBackpressureBuffer(1000); // 缓冲背压}public Mono<Void> saveUsersInTransaction(Flux<User> users) {return template.inTransaction(() -> users.buffer(100) // 每100个用户一批提交.flatMap(batch -> template.insert(batch).then()));}public Flux<User> findUsersWithRetry() {return template.select(User.class).from("users").all().retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onErrorResume(e -> {System.err.println("Database unavailable: " + e.getMessage());return Flux.empty(); // 返回空流而不是错误});}
}
http://www.xdnf.cn/news/1400167.html

相关文章:

  • Spring代理的特点
  • AI-调查研究-65-机器人 机械臂控制技术的前世今生:从PLC到MPC
  • 【MCP系列教程】 Python 实现 FastMCP StreamableHTTP MCP:在通义灵码 IDE 开发并部署至阿里云百炼
  • JsMind 常用配置项
  • 【计算机网络】HTTP是什么?
  • 基于Docker部署的Teable应用
  • Linux驱动开发重要操作汇总
  • “人工智能+”政策驱动下的技术重构、商业变革与实践路径研究 ——基于国务院《关于深入实施“人工智能+”行动的意见》的深度解读
  • wpf之依赖属性
  • 桌面GIS软件FlatGeobuf转Shapefile代码分享
  • 学习游戏制作记录(视觉上的优化)
  • 第三弹、AI、LLM大模型是什么?
  • Visual Studio(vs)免费版下载安装C/C++运行环境配置
  • openEuler2403安装部署Redis8
  • FPGA学习笔记——SPI读写FLASH
  • 【运维篇第三弹】《万字带图详解分库分表》从概念到Mycat中间件使用再到Mycat分片规则,详解分库分表,有使用案例
  • 小迪Web自用笔记7
  • 【Linux】如何使用 Xshell 登录 Linux 操作系统
  • SC税务 登录滑块 分析
  • 拦截器Intercepter
  • hello算法笔记 01
  • Isaac Lab Newton 人形机器人强化学习 Sim2Real 训练与部署
  • 下一代 AI 交互革命:自然语言对话之外,“意念控制” 离商用还有多远?
  • 在 .NET Core 中实现基于策略和基于角色的授权
  • HarmonyOS应用的多Module设计机制:构建灵活高效的应用程序
  • 【瑞吉外卖】手机号验证码登录(用QQ邮件发送代替)
  • python制作一个股票盯盘系统
  • NV032NV037美光固态闪存NV043NV045
  • 基于开源AI大模型AI智能名片S2B2C商城小程序的产地优势产品销售策略研究
  • 前端代码结构详解