当前位置: 首页 > ds >正文

Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成

Java+AI开发实战与知识点归纳系列:Spring流式输出实战——LangChain4j与Ollama集成

该系列会以java+AI开发为主线,"顺路"的讲解许多java知识,同时学会AI应用的基本开发。每一篇都是完整可运行的代码。文末有项目目录结构,对照文章内容即可完成开发~
此外本文附带了完整的项目代码

如果文章内容有不流畅或讲解缺失,请随时评论区留言~

在上一篇文章中,我们学会了如何通过Spring配置管理LangChain4j与Ollama的集成,实现了"改配置不用改代码"的灵活部署。但是,当我们实际使用AI聊天应用时,会发现一个问题:用户提问后需要等待很久才能看到完整的回答,体验很不好。

今天我们要解决的就是这个问题:如何实现AI回答的流式输出,让用户能够实时看到AI的思考过程,就像ChatGPT那样一个字一个字地"打字"出来

一、从问题出发:为什么需要流式输出?(如果觉得老生常谈,直接跳下一节就好)

想象一下这样的场景:你问AI一个复杂问题,比如"请详细解释Spring Boot的自动配置原理",如果使用传统的同步方式,你可能需要等待10-30秒才能看到完整答案。这种等待是很煎熬的,用户不知道系统是否还在工作,甚至可能以为程序卡死了。

而流式输出就像真人对话一样,AI一边思考一边说话,用户能实时看到回答的进展,大大提升了交互体验。这就是我们今天要实现的目标:

  1. 实时响应:用户发送消息后立即开始接收AI的回答
  2. 流式传输:AI的回答一个词一个词地传输到前端
  3. 优雅处理:妥善处理网络异常、模型错误等边界情况

二、技术选型:为什么选择WebFlux?

要实现流式输出,我们需要选择合适的技术栈。传统的Spring MVC基于Servlet API,天然是阻塞式的,不太适合处理长时间的流式响应。而Spring WebFlux基于Reactor模式,天生支持异步非阻塞,是实现流式输出的最佳选择。

WebFlux vs Spring MVC对比(在实现的时候,仔细考虑过选SseEmitter还是flux的,最终选择了WebFlux)

特性Spring MVCSpring WebFlux
编程模型阻塞式非阻塞式
线程模型一个请求一个线程少量线程处理大量请求
流式支持有限支持原生支持
学习成本较低较高

在我们的项目中,你会看到pom.xml中引入的是spring-boot-starter-webflux而不是spring-boot-starter-web,这就是为了支持响应式编程。

三、核心实现:一步步构建流式聊天

在开始实现之前,我想说如果不了解响应式编程,可能会对WebFlux的概念和使用方式感到陌生。但是,我会尽量用简单的语言和代码示例,帮助你理解流式输出的实现原理。如果刚入门确实难以看懂,AI时代,我强烈建议你合理使用AI工具,提示词也给你准备好了,5分钟后再切回来吧~

提示词:你是一个技术专家也是一个优秀的讲师。请快速的为我介绍响应式编程的基本概念和原理,为我介绍springwebflux。然后给我一个示例,是基于springwebflux的,并针对示例为我一步步讲解。目标是让我快速理解和掌握响应式编程的核心思想,熟悉springwebflux实现的响应式编程。我希望你讲的通俗易懂,自顶向下,由浅入深一些。

第一步:配置响应式Web环境

首先,我们需要在pom.xml中引入WebFlux依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

这个依赖会自动引入Reactor Core、Netty等响应式编程所需的核心库。与传统的Tomcat不同,WebFlux默认使用Netty作为Web服务器,Netty的事件循环模型天然支持高并发的异步处理。

第二步:设计流式响应的Controller

接下来是关键的Controller设计。我们需要返回一个Flux<String>类型,这是Reactor中表示多个异步数据流的类型:

@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {private final ChatService chatService;@PostMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)public Flux<String> streamChat(@RequestBody ChatRequest request) {return chatService.streamChat(request.getMessage());}
}

这里有几个关键点需要理解:

  1. MediaType.APPLICATION_STREAM_JSON_VALUE:这个媒体类型告诉浏览器这是一个流式响应,数据会分批次发送
  2. Flux:Flux是Reactor中的核心类型,表示0到N个异步序列元素
  3. @RequestBody ChatRequest:我们仍然可以正常接收JSON请求体

第三步:实现核心的流式服务

最核心的部分是ChatService,这里我们需要将LangChain4j的回调模式转换为Reactor的流式模式:

@Service
@RequiredArgsConstructor
public class ChatService {private final StreamingChatLanguageModel streamingChatLanguageModel;public Flux<String> streamChat(String message) {return Flux.create(sink -> {try {// 发送初始连接成功消息sink.next("I am coming! \n");// 使用StreamingChatLanguageModel的chat方法streamingChatLanguageModel.chat(message,new StreamingChatResponseHandler() {@Overridepublic void onPartialResponse(String partialResponse) {sink.next(partialResponse);}@Overridepublic void onCompleteResponse(ChatResponse completeResponse) {sink.complete();}@Overridepublic void onError(Throwable error) {// 优雅的错误处理handleStreamError(sink, error);}});} catch (Exception e) {sink.error(e);}}, FluxSink.OverflowStrategy.BUFFER);}private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";System.err.println("警告: 捕获到空指针异常: " + errorMsg);if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}try {System.err.println("错误: " + error.getClass().getName() + ": " + error.getMessage());sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();} catch (Exception e) {sink.error(error);}}
}

这段代码的核心思想是桥接模式:将LangChain4j的回调式API转换为Reactor的流式API。让我们深入理解几个关键概念:

Flux.create()的工作原理

Flux.create()是Reactor提供的工厂方法,用于从外部的异步源创建Flux。它接收一个Consumer参数,FluxSink就像一个"水龙头",我们可以通过它向下游发送数据:

  • sink.next(data):发送一个数据元素
  • sink.complete():表示数据流结束
  • sink.error(throwable):发送错误信号
StreamingChatResponseHandler回调处理

LangChain4j使用回调模式处理流式响应,我们需要实现三个关键方法:

  1. onPartialResponse():每当模型生成一小段文本时调用
  2. onCompleteResponse():当模型完成整个回答时调用
  3. onError():当发生错误时调用
背压处理策略

FluxSink.OverflowStrategy.BUFFER指定了背压处理策略。背压是响应式编程中的重要概念,当生产者产生数据的速度超过消费者处理速度时,就会发生背压。BUFFER策略会缓存所有数据,适合我们这种场景。

第四步:配置Ollama流式模型

我们需要配置LangChain4j的流式聊天模型。注意这里使用的是StreamingChatLanguageModel而不是普通的ChatLanguageModel

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "langchain4j.chat", name = "enable", havingValue = "ollama")
public class OllamaConfig {private final OllamaProperties ollamaProperties;@Beanpublic StreamingChatLanguageModel streamingChatLanguageModel() {log.info("正在初始化Ollama流式聊天模型...");return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();}
}

这里的@ConditionalOnProperty注解实现了条件化配置,只有当langchain4j.chat.enable=ollama时才会创建这个Bean。这种设计让我们可以轻松切换不同的AI模型提供商。

四、前端实现:处理流式数据

后端实现了流式输出,前端也需要相应地处理流式数据。我们使用Fetch API的ReadableStream来处理:

function sendMessage() {const message = messageInput.value.trim();if (!message) return;// 添加用户消息addMessage(message, true);messageInput.value = '';// 发送请求并处理流式响应fetch('/api/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({ message: message })}).then(response => {if (!response.ok) {throw new Error('网络响应不正常');}// 创建流式读取器const reader = response.body.getReader();const decoder = new TextDecoder();// 处理数据流function processStream() {return reader.read().then(({ done, value }) => {if (done) {completeStreamingMessage();return;}// 解码接收到的数据const text = decoder.decode(value, { stream: true });addStreamingMessage(text);return processStream();});}return processStream();}).catch(error => {console.error('Error:', error);addMessage('发生错误: ' + error.message, false);});
}

关键技术点解析

ReadableStream API

response.body.getReader()返回一个ReadableStreamDefaultReader,它可以逐块读取响应体数据。这是浏览器原生支持的流式API,非常适合处理服务器发送的流式数据。

TextDecoder的流式解码

new TextDecoder()创建一个文本解码器,decode(value, { stream: true })中的stream: true参数很重要,它告诉解码器这是流式数据,可能存在跨块的字符边界。

递归处理模式

processStream()函数使用递归调用来持续读取数据流,直到done为true表示流结束。这种模式简洁而高效。

五、错误处理与优化

在实际应用中,我们需要考虑各种异常情况:

1. 网络异常处理

private void handleStreamError(FluxSink<String> sink, Throwable error) {if (error instanceof NullPointerException) {// 处理模型返回空响应的情况String errorMsg = error.getMessage() != null ? error.getMessage() : "未知空指针异常";if (errorMsg.contains("getMessage()") || errorMsg.contains("getContent()")) {sink.next("\n[系统提示: 模型返回了空响应,请重试]");sink.complete();return;}}// 通用错误处理sink.next("\n[系统错误: " + error.getMessage() + "]");sink.complete();
}

2. 超时处理

在application.yml中配置合理的超时时间:

langchain4j:chat:ollama:timeout: 120s  # 2分钟超时

3. 前端连接管理

// 关闭之前的连接(如果有)
if (eventSource) {eventSource.close();
}

六、性能优化与最佳实践

1. 背压控制

当AI生成速度很快时,可能会产生背压。我们使用BUFFER策略,但在生产环境中可能需要考虑DROP或ERROR策略:

Flux.create(sink -> {// 实现逻辑
}, FluxSink.OverflowStrategy.BUFFER);  // 可以改为DROP或ERROR

2. 内存管理

流式处理需要注意内存使用,避免大量数据积压:

// 在配置中限制缓冲区大小
@Bean
public StreamingChatLanguageModel streamingChatLanguageModel() {return OllamaStreamingChatModel.builder().baseUrl(ollamaProperties.getBaseUrl()).modelName(ollamaProperties.getModelName()).temperature(ollamaProperties.getTemperature()).timeout(ollamaProperties.getTimeout()).build();
}

3. 日志监控

添加适当的日志来监控流式处理的性能:

@Override
public void onPartialResponse(String partialResponse) {log.debug("接收到部分响应: {} 字符", partialResponse.length());sink.next(partialResponse);
}

七、项目结构总览

src/
└── main/├── java/│   └── com/example/ollamademo/│       ├── OllamaDemoApplication.java          # 启动类│       ├── config/│       │   ├── OllamaConfig.java              # Ollama配置│       │   └── properties/│       │       └── OllamaProperties.java      # 配置属性│       ├── controller/│       │   ├── ChatController.java            # 流式聊天控制器│       │   └── WebController.java             # 页面控制器│       ├── dto/│       │   └── ChatRequest.java               # 请求DTO│       └── service/│           └── ChatService.java               # 核心流式服务└── resources/├── application.yml                        # 配置文件└── templates/└── index.html                         # 前端页面

八、运行与测试

  1. 启动Ollama服务

    ollama serve
    ollama pull qwen3:8b
    
  2. 启动Spring Boot应用

    mvn spring-boot:run
    
  3. 访问应用
    打开浏览器访问 http://localhost:8080

  4. 测试流式输出
    输入问题,观察AI回答是否一个字一个字地出现

九、总结与展望

本文主要内容回顾:

  1. 响应式编程基础:理解了Flux、Mono等Reactor核心概念
  2. 流式API设计:掌握了如何设计流式响应的RESTful API
  3. 异步编程模式:学会了回调模式与流式模式的转换
  4. 前端流式处理:掌握了ReadableStream API的使用
  5. 错误处理策略:学会了在流式场景下的异常处理

在下一篇文章中,我们将再走一步,探讨如何实现多轮对话的上下文管理,让AI能够记住之前的对话内容,实现更智能的交互体验。


本系列文章将持续更新,每一篇都是完整可运行的项目。如果你觉得有帮助,欢迎点赞收藏,也欢迎在评论区分享你的想法和问题!
💻完整的代码可以直接下载我上传的资源。

http://www.xdnf.cn/news/20016.html

相关文章:

  • 2025 大数据时代值得考的证书排名前八​
  • TypeScript与JavaScript:从动态少年到稳重青年的成长之路
  • “企业版维基百科”Confluence
  • STM32 - Embedded IDE - GCC - 如何在工程中定义一段 NoInit RAM 内存
  • 爬取m3u8视频完整教程
  • JavaWeb项目在服务器部署
  • 数据结构之----线性表其一---顺序表
  • 弱电太累,职业发展遇瓶颈?那一定不要错过这个技能!
  • 单片机(89C51)---基础知识
  • 阅兵时刻,耐达讯自动化RS485 转 Profinet 网关助力矿山冶金连接迈向辉煌
  • 【大数据技术实战】Flink+DS+Dinky 自动化构建数仓平台
  • 嵌入式 Linux 启动流程详解 (以 ARM + U-Boot 为例)
  • 【ShiMetaPi M4-R1】上手:RK3568B2|开源鸿蒙(OpenHarmony) 应用开发快速上手
  • Vue+Echarts饼图深度美化指南:打造卓越数据可视化体验
  • 深入理解 Java 集合框架:底层原理与实战应用
  • 0元部署私有n8n,免费的2CPU+16GB服务器,解锁无限制的工作流体验
  • ruoyi vue element 实现点击、返回首页收起已经展开的菜单栏
  • SpringBoot 整合 Kafka 的实战指南
  • 《用 Django 构建博客应用:从模型设计到文章管理的全流程实战》
  • 2025年11月GIS应用技术测评考试(附考试资料分享)
  • 【开题答辩全过程】以 校园安全管理系统设计与实现为例,包含答辩的问题和答案
  • Django 命令大全:从入门到精通,开发者必备指南
  • Spring Boot 事务失效的八大原因及解决方案详解
  • 什么是科技成果鉴定测试?成果鉴定测试报告带给企业什么好处?
  • 【54页PPT】基于DeepSeek的数据治理技术(附下载方式)
  • 数据库高可用全方案:Keepalived 故障切换 + LVS (DR) 模式 + MariaDB 主主同步实战案例
  • 深度学习----卷积神经网络的数据增强
  • docker 安装 redis 并设置 volumes 并修改 修改密码(三)
  • 工厂设备物联平台_HawkEye智能运维平台_璞华大数据
  • mac idea 配置了Gitlab的远程地址,但是每次pull 或者push 都要输入密码,怎么办