SpringBoot学习日记 Day9:响应式编程新世界探索
一、开篇:从阻塞到响应式的思维转变
第一次接触响应式编程时,我仿佛打开了新世界的大门!传统的阻塞式编程就像单车道,一次只能过一辆车;而响应式编程就像多车道立交桥,车辆(请求)可以并行通过。让我们开始这段奇妙的旅程!
二、响应式基础:理解核心概念
1. 阻塞式 vs 响应式对比
传统阻塞式模型:
@GetMapping("/user/{id}")
public User getUser(@PathVariable String id) {// 每个请求占用一个线程,线程阻塞等待数据库响应User user = userRepository.findById(id); // 阻塞!return user;
}
响应式模型:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {// 线程不阻塞,立即返回Publisher,数据就绪时推送return userRepository.findById(id); // 非阻塞!
}
2. Reactor核心类:Mono和Flux
Mono:0-1个结果的异步序列
Mono<String> mono = Mono.just("Hello") // 创建包含单个值的Mono.map(String::toUpperCase) // 转换操作.delayElement(Duration.ofSeconds(1)); // 延迟1秒// 订阅消费
mono.subscribe(result -> System.out.println("Received: " + result));
Flux:0-N个结果的异步序列
Flux<Integer> flux = Flux.range(1, 5) // 创建1-5的序列.filter(n -> n % 2 == 0) // 过滤偶数.map(n -> n * 2) // 乘以2.delayElements(Duration.ofMillis(500)); // 每个元素延迟500ms// 订阅消费
flux.subscribe(data -> System.out.println("Data: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Completed!")
);
3. 背压(Backpressure)概念
生产者-消费者模型:
生产者: Flux.range(1, 1000) // 快速生产
消费者: .limitRate(10) // 慢速消费通过背压机制,消费者可以控制生产速度,避免被数据淹没
三、创建第一个WebFlux应用
1. 项目初始化
使用Spring Initializr选择:
- Spring Reactive Web
- Spring Data R2DBC
- H2 Database
- Lombok
2. 响应式Controller实战
@RestController
@RequestMapping("/reactive/articles")
public class ReactiveArticleController {private final ReactiveArticleRepository articleRepo;// 获取所有文章(流式返回)@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<Article> getAllArticles() {return articleRepo.findAll().delayElements(Duration.ofMillis(100)) // 模拟流式效果.doOnNext(article -> log.info("发送文章: {}", article.getTitle()));}// 获取单篇文章@GetMapping("/{id}")public Mono<Article> getArticle(@PathVariable String id) {return articleRepo.findById(id).switchIfEmpty(Mono.error(new ArticleNotFoundException()));}// 创建文章@PostMappingpublic Mono<Article> createArticle(@RequestBody Article article) {return articleRepo.save(article);}// 实时文章计数@GetMapping("/count")public Mono<Long> getArticleCount() {return articleRepo.count();}
}
四、响应式数据访问:R2DBC实战
1. 实体类配置
@Data
@Table("articles")
public class Article {@Idprivate String id;private String title;private String content;private String author;@CreatedDateprivate LocalDateTime createTime;
}
2. 响应式Repository
public interface ReactiveArticleRepository extends ReactiveCrudRepository<Article, String> {// 根据作者查询Flux<Article> findByAuthor(String author);// 标题模糊查询Flux<Article> findByTitleContaining(String keyword);// 自定义查询@Query("SELECT * FROM articles WHERE create_time > $1")Flux<Article> findRecentArticles(LocalDateTime date);
}
3. 响应式操作链
public Flux<Article> getRecentPopularArticles() {return articleRepo.findByCreateTimeAfter(LocalDateTime.now().minusDays(7)).filter(article -> article.getViewCount() > 1000) // 过滤热门文章.sort(Comparator.comparing(Article::getViewCount).reversed()) // 按浏览量排序.take(10) // 取前10条.onErrorResume(e -> {log.error("查询失败", e);return Flux.empty(); // 错误处理}).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 重试机制
}
五、响应式MongoDB集成
1. 配置依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
2. MongoDB响应式Repository
public interface ReactiveCommentRepository extends ReactiveMongoRepository<Comment, String> {Flux<Comment> findByArticleId(String articleId);Flux<Comment> findByAuthorAndCreateTimeAfter(String author, LocalDateTime createTime);
}
六、晚间实战:博客系统响应式改造
1. 文章列表接口改造
改造前(阻塞式):
@GetMapping
public List<Article> getArticles() {return articleRepository.findAll(); // 阻塞等待所有数据
}
改造后(响应式):
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Article> getArticlesStream() {return articleRepository.findAll().delayElements(Duration.ofMillis(50)) // 流式输出.doOnSubscribe(sub -> log.info("开始流式传输文章数据"));
}
2. 响应式测试工具:WebTestClient
@SpringBootTest
@AutoConfigureWebTestClient
class ReactiveArticleControllerTest {@Autowiredprivate WebTestClient webTestClient;@Testvoid shouldGetArticlesStream() {webTestClient.get().uri("/reactive/articles").accept(MediaType.TEXT_EVENT_STREAM).exchange().expectStatus().isOk().expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM).expectBodyList(Article.class).hasSize(5);}@Testvoid shouldHandleErrors() {webTestClient.get().uri("/reactive/articles/not-exist").exchange().expectStatus().isNotFound();}
}
3. 性能对比测试
测试场景:1000并发请求查询文章列表
模式 | 平均响应时间 | 内存占用 | 线程数
----------|-------------|---------|--------
阻塞式 | 1200ms | 高 | 100+
响应式 | 350ms | 低 | 10-20
七、学习心得与避坑指南
1. 思维转变挑战
- 从同步到异步:需要适应"先返回Promise,后处理数据"的模式
- 调试困难:调用栈变得复杂,需要熟练使用调试工具
- 错误处理:传统的try-catch不再适用,要用操作符处理
2. 常见问题解决
问题1:`block()/blockFirst()/blockLast()`在生产环境的使用
- 原因:会阻塞线程,破坏响应式优势
- 解决:始终保持在响应式链中处理
问题2:混合使用阻塞和非阻塞代码
- 现象:性能反而下降
- 解决:用`publishOn`/`subscribeOn`指定调度器
问题3:内存泄漏
- 原因:未正确取消订阅
- 解决:使用`timeout`/`take`等操作符限制
3. 适用场景建议
- 推荐:高并发IO密集型应用(网关、消息推送)
- 谨慎:CPU密集型计算(优势不明显)
- 避免:已有大量阻塞代码的项目改造
八、明日预告
响应式编程进阶:
1. 响应式事务管理
2. WebSocket实时通信
3. 响应式安全控制
4. 背压策略实战
思考题:在电商系统的秒杀场景中,响应式编程相比传统阻塞式有什么优势?可能会遇到哪些新的挑战?欢迎在评论区分享你的见解!
如果觉得这篇响应式编程入门有帮助,请点赞收藏支持~完整示例代码可以通过私信获取。在实际项目中尝试响应式改造时遇到的问题,也欢迎留言讨论!