响应式编程框架Reactor【5】
文章目录
- 八、Reactor测试
- 8.1 核心工具与依赖
- 8.2 依赖引入
- 8.3 StepVerifier
- 8.3.1 基础用法:验证正常流与完成信号
- 8.3.2 验证错误信息号
- 8.3.3 验证条件与断言
- 8.3.4 验证背压行为
- 8.3.5 验证时间依赖操作(虚拟时间)
- 8.3.6 验证资源清理与取消
- 8.4 测试副作用与状态变化
- 8.3.7 验证多个元素
- 8.5 TestPublisher:手动控制信号发射
- 8.5.1 模拟正常数据流
- 8.5.2 模拟错误流
- 8.5.3 模拟背压请求
- 8.5.4 模拟错误处理
- 8.6 并发与线程切换测试
- 8.7 测试最佳实践
- 8.8 StepVerifier工作流程
- 九、实际应用场景
- 9.1 Web应用中的响应式处理
- 9.2 数据库响应式访问
八、Reactor测试
在 Reactor 响应式编程中,测试是确保流行为正确性的关键环节。由于响应式流的异步性、序列性和副作用特性,传统的单元测试方法(如直接断言返回值)难以适用。Reactor 提供了专门的测试库reactor-test
,通过StepVerifier
等工具实现对流的声明式验证,可精准测试元素序列、完成 / 错误信号、背压行为、时间依赖及副作用等场景。
8.1 核心工具与依赖
Reactor 的测试能力主要依赖reactor-test模块,其中:
-
StepVerifier
:核心类,用于声明式验证流的行为(元素序列、信号、背压等)。 -
TestPublisher
:用于手动发送信号(next
/error
/complete
),测试订阅者对各种信号的响应。 -
VirtualTimeScheduler
:虚拟时间调度器,用于模拟时间流逝,测试延迟、超时等时间依赖操作。 -
StepVerifier
- 作用:用于测试
Publisher
(Mono
/Flux
)的输出序列。 - 特点:声明式、链式调用、支持虚拟时间(Virtual Time)测试延迟操作。
- 作用:用于测试
-
TestPublisher<T>
- 作用:一个可编程的
Publisher
,用于模拟外部服务或测试背压、错误等场景。
- 作用:一个可编程的
8.2 依赖引入
需在项目中添加reactor-test
依赖:
<!-- Maven -->
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.5.9</version> <!-- 与Reactor核心版本一致 --><scope>test</scope>
</dependency>
8.3 StepVerifier
流行为的声明式验证
StepVerifier
是 Reactor 测试的核心工具,它通过 “步骤式” 声明来验证流的完整生命周期:从订阅开始,到元素发射、信号(完成 / 错误)触发,再到背压交互。
8.3.1 基础用法:验证正常流与完成信号
场景:测试一个简单流(如Flux.range(1,3)
)是否按预期发射元素并正常完成。
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.junit.Test;public class StepVerifierBasicTest {@Testpublic void testNormalFlux() {// 待测试的流:发射1,2,3后完成Flux<Integer> flux = Flux.range(1, 3);// StepVerifier验证流程StepVerifier.create(flux)// 预期第一个元素为1.expectNext(1)// 预期第二个元素为2.expectNext(2)// 预期第三个元素为3.expectNext(3)// 预期流正常完成.expectComplete()// 执行验证.verify();}
}
流程分析
8.3.2 验证错误信息号
场景:测试流在特定条件下是否抛出预期的错误(如RuntimeException
)。
@Testpublic void testErrorFlux() {// 待测试的流:发射1后抛出异常Flux<Integer> errorFlux = Flux.range(1, 2).map(num -> {if (num == 2) {throw new RuntimeException("模拟错误");}return num;});StepVerifier.create(errorFlux).expectNext(1) // 预期第一个元素为1// 预期抛出RuntimeException,且消息匹配.expectErrorMatches(error ->error instanceof RuntimeException &&error.getMessage().equals("模拟错误")).verify(); // 执行验证}
8.3.3 验证条件与断言
@Test
public void testExpectPredicate() {Flux<Integer> flux = Flux.just(2, 4, 6, 8);StepVerifier.create(flux).expectNextMatches(n -> n % 2 == 0) // 验证元素满足条件.expectNextMatches(n -> n > 0).thenConsumeWhile(n -> n < 10) // 消费并验证所有后续元素 < 10.expectComplete().verify();
}
8.3.4 验证背压行为
场景:测试流是否正确响应背压(消费者通过request(n)
控制元素请求量)。
@Test
public void testBackpressure() {// 待测试的流:发射1-5Flux<Integer> flux = Flux.range(1, 5);StepVerifier.create(flux)// 模拟消费者初始请求2个元素.thenRequest(2).expectNext(1, 2) // 验证前2个元素// 再请求2个元素.thenRequest(2).expectNext(3, 4) // 验证接下来2个元素// 最后请求1个元素.thenRequest(1).expectNext(5) // 验证最后1个元素.expectComplete().verify();
}
8.3.5 验证时间依赖操作(虚拟时间)
场景:测试含延迟的流(如delayElements
),使用虚拟时间避免实际等待,加速测试。
@Test
public void testDelayElements() {// 待测试的流:每个元素延迟1秒发射(共3个元素)Flux<Integer> delayedFlux = Flux.range(1, 3).delayElements(Duration.ofSeconds(1));// 使用虚拟时间调度器VirtualTimeScheduler.getOrSet();StepVerifier.withVirtualTime(() -> delayedFlux).expectSubscription() // 验证订阅成功// 向前推进3秒(覆盖所有延迟).thenAwait(Duration.ofSeconds(3)).expectNext(1, 2, 3) // 验证所有元素被发射.expectComplete().verify();
}
关键说明:
withVirtualTime(Supplier<Publisher>)
:启用虚拟时间,流的时间操作(如delay
)会基于虚拟时钟而非真实时间。thenAwait(Duration)
:手动推进虚拟时间,无需实际等待,大幅提升测试速度。
真实时间超时
@Test
public void testWithTimeout() {Flux<Long> slowFlux = Flux.interval(Duration.ofSeconds(2)) // 每 2 秒发一个.take(1); // 只取一个StepVerifier.create(slowFlux, StepVerifierOptions.create().withVirtualTime()).expectNoEvent(Duration.ofSeconds(1)) // 1 秒内无事件.expectNextCount(1) // 之后收到 1 个.expectComplete().verify(Duration.ofSeconds(3)); // 整体超时 3 秒
}
✅ 虚拟时间优势:测试 interval
, delay
, timeout
等操作符时,无需真实等待,大幅提升测试速度。
8.3.6 验证资源清理与取消
@Test
public void testCancel() {Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(100));StepVerifier.create(intervalFlux).expectNextCount(2) // 收到 2 个.thenCancel() // 主动取消订阅.verify(); // 验证取消成功
}
8.4 测试副作用与状态变化
响应式流中常包含副作用操作(如doOnNext
、doOnError
、doFinally
),需验证这些副作用是否按预期执行(如日志记录、状态更新)
@Test
public void testSideEffects() {List<Integer> processed = new ArrayList<>(); // 记录副作用执行结果// 待测试的流:发射1-3,每次发射后执行副作用(添加到list)Flux<Integer> fluxWithSideEffect = Flux.range(1, 3).doOnNext(num -> processed.add(num)) // 副作用:记录处理的元素.doOnComplete(() -> processed.add(-1)); // 完成时添加标记StepVerifier.create(fluxWithSideEffect).expectNext(1, 2, 3).expectComplete().verify();// 验证副作用是否按预期执行assert processed.equals(List.of(1, 2, 3, -1));
}
8.3.7 验证多个元素
@Test
public void testExpectNexts() {Flux<Integer> flux = Flux.range(1, 3); // 1, 2, 3StepVerifier.create(flux).expectNext(1, 2, 3) // 一次性验证多个元素.expectComplete().verify();// 或者使用集合StepVerifier.create(flux).expectNextSequence(List.of(1, 2, 3)).expectComplete().verify();
}
8.5 TestPublisher:手动控制信号发射
TestPublisher
用于模拟一个 Publisher
,常用于单元测试中替换外部依赖(如数据库、HTTP 客户端)
TestPublisher
用于手动发送信号(next
/error
/complete
),适合测试订阅者(或操作符)对异常信号序列的响应(如重复发射、提前完成等)。
8.5.1 模拟正常数据流
@Test
public void testWithTestPublisher() {// 1. 创建 TestPublisherTestPublisher<String> testPublisher = TestPublisher.create();// 2. 被测试的服务使用这个 publisherServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);// 3. 手动发送数据testPublisher.next("Data1", "Data2");testPublisher.complete();// 4. 验证服务行为(假设服务有回调或状态)// 例如:assertThat(service.getProcessedData()).contains("Data1", "Data2");
}
8.5.2 模拟错误流
@Test
public void testErrorWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();ServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);testPublisher.error(new IOException("Network failed"));// 验证服务正确处理错误// assertThat(service.hasError()).isTrue();
}
8.5.3 模拟背压请求
@Test
public void testBackpressureWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();StepVerifier.create(testPublisher.flux()).thenRequest(1) // 主动请求 1 个.expectNext("dummy") // TestPublisher 默认发送 dummy 值.then(() -> testPublisher.next("A")) // 发送真实数据.expectNext("A").thenRequest(1).then(() -> testPublisher.next("B")).expectNext("B").thenCancel().verify();
}
8.5.4 模拟错误处理
@Test
public void testInvalidSignalSequence() {// 创建一个严格模式的TestPublisher(不允许无效信号序列)TestPublisher<Integer> testPublisher = TestPublisher.create();// 手动发送信号:先发射元素,再发送错误,最后尝试发射元素(无效操作)testPublisher.next(1);testPublisher.error(new RuntimeException("测试错误"));testPublisher.next(2); // 错误后发射元素是无效的// 验证订阅者是否能捕获无效信号导致的错误Flux<Integer> flux = testPublisher.flux();StepVerifier.create(flux).expectNext(1).expectError(RuntimeException.class)// 验证错误消息(包含无效信号的描述).verify();
}
关键说明:
TestPublisher.createStrict()
:严格模式,会检查信号序列的合法性(如错误 / 完成后不能再发射元素),非法操作会抛出异常。TestPublisher.createNonStrict()
:非严格模式,允许非法信号序列,适合测试容错逻辑。
8.6 并发与线程切换测试
测试涉及线程切换(publishOn
/subscribeOn
)的流时,需确保操作在预期线程上执行,且并发场景下行为正确。
示例:验证线程切换后的执行线程
@Test
public void testThreadSwitch() {// 待测试的流:先在boundedElastic读取,再在parallel处理Flux<Integer> flux = Flux.range(1, 2).subscribeOn(Schedulers.boundedElastic()).doOnNext(num -> {// 验证上游操作在boundedElastic线程执行assert Thread.currentThread().getName().startsWith("boundedElastic");}).publishOn(Schedulers.parallel()).doOnNext(num -> {// 验证下游操作在parallel线程执行assert Thread.currentThread().getName().startsWith("parallel");});StepVerifier.create(flux).expectNext(1, 2).expectComplete().verify();
}
8.7 测试最佳实践
- 优先使用
StepVerifier
:它覆盖了 90% 以上的测试场景,声明式语法清晰且能验证完整流生命周期。 - 虚拟时间加速测试:对含
delay
、timeout
的流,用withVirtualTime
避免实际等待,测试速度提升 10 倍以上。 - 验证副作用:通过外部状态(如
List
、AtomicInteger
)记录副作用,确保doOnXXX
操作按预期执行。 - 严格模式测试边界情况:用
TestPublisher
的严格模式测试非法信号(如重复完成、错误后发射元素),验证流的容错性。 - 隔离测试:每个测试方法应独立,避免共享状态(如调度器、计数器)导致测试相互干扰。
- 断言具体化:使用
expectErrorMatches
而非expectError()
,精准验证错误类型和消息;用expectNextSequence
验证批量元素。
8.8 StepVerifier工作流程
九、实际应用场景
9.1 Web应用中的响应式处理
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class WebApplicationExamples {// 使用WebClient进行HTTP调用public Flux<User> getUsersWithPosts() {WebClient webClient = WebClient.create("https://api.example.com");return webClient.get().uri("/users").retrieve().bodyToFlux(User.class).flatMap(user -> webClient.get().uri("/users/{id}/posts", user.getId()).retrieve().bodyToFlux(Post.class).collectList().map(posts -> {user.setPosts(posts);return user;})).timeout(Duration.ofSeconds(5)).onErrorResume(e -> {// 记录错误但继续处理其他用户System.err.println("Error fetching posts: " + e.getMessage());return Mono.just(new User()); // 返回空用户或默认值});}// 批量处理与缓冲public Flux<Result> processInBatches(Flux<Item> items) {return items.buffer(100) // 每100个元素一批.delayElements(Duration.ofMillis(500)) // 控制速率.flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.parallel()));}private Mono<Result> processBatch(List<Item> batch) {return Mono.fromCallable(() -> {// 模拟批量处理return new Result("Processed " + batch.size() + " items");});}// 实时数据流处理public Flux<Event> processRealTimeEvents(Flux<Event> eventStream) {return eventStream.window(Duration.ofSeconds(1)) // 1秒窗口.flatMap(window -> window.groupBy(Event::getType).flatMap(group -> group.reduce((e1, e2) -> new Event(e1.getType(), e1.getValue() + e2.getValue()))));}
}// 简单的数据模型
class User {private String id;private List<Post> posts;// getters and setters
}class Post {private String id;private String content;// getters and setters
}class Item {private String id;// getters and setters
}class Result {private String message;// constructor, getters
}class Event {private String type;private int value;// constructor, getters
}
9.2 数据库响应式访问
伪代码,需要引入其它的依赖项
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class DatabaseExamples {private final R2dbcEntityTemplate template;public Flux<User> findActiveUsers() {return template.select(User.class).from("users").matching(where("active").is(true)).all().delayElements(Duration.ofMillis(10)) // 控制数据库压力.onBackpressureBuffer(1000); // 缓冲背压}public Mono<Void> saveUsersInTransaction(Flux<User> users) {return template.inTransaction(() -> users.buffer(100) // 每100个用户一批提交.flatMap(batch -> template.insert(batch).then()));}public Flux<User> findUsersWithRetry() {return template.select(User.class).from("users").all().retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onErrorResume(e -> {System.err.println("Database unavailable: " + e.getMessage());return Flux.empty(); // 返回空流而不是错误});}
}