Spring WebFlux 整合AI大模型实现流式输出
前言
最近赶上AI的热潮,很多业务都在接入AI大模型相关的接口去方便的实现一些功能,后端需要做的是接入AI模型接口,并整合成流式输出到前端,下面有一些经验和踩过的坑。
集成
Spring WebFlux是全新的Reactive Web技术栈,基于反应式编程,很适合处理我们需求的流式数据。
依赖
只需要下面这一个依赖即可,但是需要助力springboot父版本,不同的版本在相关的API实现上面有些许的差别。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
代码
这边我在controller写了一个测试代码,意思是每秒产生一段json数据,一共10次,需要注意,响应头一定要设置text/event-stream 这个值,标志着是流式输出
@GetMapping(path = "/test/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> chatTest() {//chat交互测试return Flux.interval(Duration.ofSeconds(1)).take(10).map(sequence -> "{" + " \"data\": \"33\"," + " \"count\": \"" + sequence + "\"" + "}");}
postman 调用接口测试下,正常返回数据了
后端集成AI大模型
在实际业务中,基本上都是后端来调用 deepseek,再返回给前端,下面大概是集成
public Flux<ServerSentEvent<ObjectNode>> chat() {WebClient webClient = WebClient.create();
String url = "大模型url链接";return webClient.post().uri(url).header("Accept", "text/event-stream").body(BodyInserters.fromObject(reqNode)) // 注意高版本的API 可以直接用 bodyValue().retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ObjectNode>>() {}).log().onBackpressureBuffer().doOnError(throwable -> {//错误处理log.error("chat request error -> {}", throwable.getMessage());throw new RuntimeException("request error -> " +throwable.getMessage());}).doOnNext(v -> {//每次输出流处理log.info("received chat message: {}", v);}).doOnComplete(() -> {//流输出完成处理});
一些错误解决
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
报错是由于发布者(Publisher)尝试以比订阅者(Subscriber)请求速率更快的速度推送数据时。这种情况违反了 Reactive Streams 的背压(Backpressure)机制,导致异常抛出。导致流异常终止。
在上面请求时加上了 .onBackpressureBuffer() 用缓冲机制解决