当前位置: 首页 > backend >正文

适配java8版本的langchian4j实战

springAI和较新的langchain4j都要求jdk最低版本都要17,而很多老项目我们是无法实现统一改jdk版本,但是langchian4j早期版本是适配java8的,这里我来做个实战教一下大家用适配java8版本的langchian4j来调用我们本地用ollama部署的AI大模型。

langchain4j文档:https://docs.langchain4j.info/

首先我们引入依赖(注意版本,高版本是jdk17,造成不兼容)

 <!--langchain4j--><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-open-ai</artifactId><version>0.29.1</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j</artifactId><version>0.29.1</version></dependency>

如果你想实现流式聊天(一个个字词动态接收),需要引入webflux依赖

	   <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

因为我本地是拿ollama进行下载的大模型进行测试,所以大家可以先去:Ollama,下载ollama。

ollama list  # 查看你当前模型列表
ollama run deepseek-r1:1.5b # 启动你本地的对应模型,如果没有下载,则会自动下载

这里我拿deepseek-r1:1.5b来测试,你可以选择更多参数的模型进行实验。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

我们保证本地已经启动了ollama模型并run了一个模型,现在我们回到java程序。

我们是个springboot测试类,引入一些springboot一些依赖,做个接口调用

 		<!--web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--mybatis--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--Redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--redisson--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId></dependency>

测试类:

/*** @Author: fsq* @Date: 2025/6/7 12:00* @Version: 1.0*/
@SpringBootTest
public class ChatTest {@Testpublic void test() throws Exception {OpenAiChatModel chatGlmChatModel = OpenAiChatModel.builder()// Ollama 服务地址.baseUrl("http://localhost:11434/v1")// 模型key.apiKey("EMPTY")// 最大令牌数.maxTokens(1000)// 精确度.temperature(0d)// 超时时间.timeout(Duration.ofSeconds(15))// 模型名称.modelName("deepseek-r1:1.5b")// 重试次数.maxRetries(3).build();String me = "你好!请给我讲一个笑话";System.out.println("用户:" + me);String content = chatGlmChatModel.generate(me);System.out.println("AI:" + content);}@Testpublic static void main(String[] args) {StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder().baseUrl("http://localhost:11434/v1")// 模型key.apiKey("EMPTY")// 最大令牌数.maxTokens(1000)// 精确度.temperature(0d)// 超时时间.timeout(Duration.ofSeconds(15))// 模型名称.modelName("deepseek-r1:1.5b").build();String userMessage = "你好,给我讲个笑话吧~";model.generate(userMessage, new StreamingResponseHandler() {@Overridepublic void onNext(String s) {System.out.println("onPartialResponse: " + s);}@Overridepublic void onComplete(Response response) {System.out.println("onCompleteResponse: " + response);}@Overridepublic void onError(Throwable error) {error.printStackTrace();}});}
}

注意:如果你下载了不同模型,则对应名称也要换,注意是全称!ollama地址http://localhost:11434/v1写死。

测试成功后,我们就开始正式配置到springboot里面

@Configuration
public class Langchain4jConfig {@Value("${langchain4j.base-url}")private String baseUrl;@Value("${langchain4j.api-key}")private String apiKey;@Value("${langchain4j.max-tokens}")private int maxTokens;@Value("${langchain4j.timeout-seconds}")private int timeoutSeconds;@Value("${langchain4j.model-name}")private String modelName;@Value("${langchain4j.max-retries}")private int maxRetries;@Value("${langchain4j.chat-model-temperature}")private double chatModelTemperature;@Value("${langchain4j.streaming-chat-model-temperature}")private double streamingChatModelTemperature;@SystemMessage("你叫小美,是一个智能助手")@Beanpublic ChatLanguageModel qwenChatModel() {return OpenAiChatModel.builder().baseUrl(baseUrl).apiKey(apiKey).maxTokens(maxTokens).temperature(chatModelTemperature).timeout(Duration.ofSeconds(timeoutSeconds)).modelName(modelName).maxRetries(maxRetries).build();}@SystemMessage("你叫小明,是一个智能助手")@Beanpublic StreamingChatLanguageModel qwenStreamingChatModel() {return OpenAiStreamingChatModel.builder().baseUrl(baseUrl).apiKey(apiKey).maxTokens(maxTokens).temperature(streamingChatModelTemperature).timeout(Duration.ofSeconds(timeoutSeconds)).modelName(modelName).build();}
}

controller:

@ApiOperation("普通聊天,非流式")
@GetMapping("/simple")
public String memoryChatRedis(@RequestParam(defaultValue = "我叫finch,你叫什么名字?") String message,@RequestParam(defaultValue = "1") String sessionId) {System.out.println("message = " + message);System.out.println("memoryId = " + sessionId);return chatSessionService.chat(sessionId, message);
}@ApiOperation("流式聊天")
@GetMapping(value = "/",  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> memoryChatRedisStream(@RequestParam(defaultValue = "我叫finch,你叫什么名字?") String message,@RequestParam(defaultValue = "1") String sessionId) {return chatSessionService.stream(sessionId, message);
}

service实现类:

    @Overridepublic String chat(String sessionId, String message) {return chatLanguageModel.generate(message);}@Overridepublic Flux<String> stream(String sessionId, String message) {Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();streamingChatLanguageModel.generate(message, new dev.langchain4j.model.StreamingResponseHandler() {@Overridepublic void onNext(String s) {log.info("{}",s);// 格式化并发送 SSE 消息sink.tryEmitNext(formatSseMessage(s));}@Overridepublic void onComplete(Response response) {sink.tryEmitNext(formatSseMessage("[DONE]"));sink.tryEmitComplete();log.info("数据接收完成!");}@Overridepublic void onError(Throwable error) {sink.tryEmitError(error);}});return sink.asFlux();}

这时候我们发现我们能成功接收到信息。如果我们想形成对话记忆以及聊天记录的存储呢?

根据官方文档,我们需要配置聊天记忆 | LangChain4j 中文文档

这是一个redis的示例:

@Configuration
@RequiredArgsConstructor
@Slf4j
public class PersistentChatMemoryStore implements ChatMemoryStore {private final StringRedisTemplate redisTemplate;private final IChatSessionService chatSessionService;private final DataDelayTaskHandler dataDelayTaskHandler;private final ExecutorService executorService = Executors.newCachedThreadPool();private String getKey(Object sessionId) {//示例:chat:memory:2:1return CHAT_MEMORY_KEY_PREFIX + UserContext.getUser()+":"+ sessionId ;}@Overridepublic List<ChatMessage> getMessages(Object sessionId) {try {List<String> messageList = redisTemplate.opsForList().range(getKey(sessionId), 0, -1);log.info("getMessages messageList:{}", messageList);if (CollUtil.isNotEmpty(messageList)) {String json = messageList.toString();return messagesFromJson(json);}// 获取不到对话历史,则从数据库中获取List<ChatSession> chatSessionList = chatSessionService.lambdaQuery().eq(ChatSession::getUserId, UserContext.getUser()).eq(ChatSession::getSessionId, sessionId).orderByAsc(ChatSession::getSegmentIndex).list();// 判断是否为空if (CollUtil.isNotEmpty(chatSessionList)) {messageList = chatSessionList.stream().map(ChatSession::getContent).collect(Collectors.toList());return messagesFromJson(messageList.toString());}return Collections.emptyList();} catch (Exception e) {log.error("读取对话历史失败", e);return Collections.emptyList();}}@Overridepublic void updateMessages(Object sessionId, List<ChatMessage> messages) {if (messages == null || messages.isEmpty()) {return;}try {// 将最新的一条数据存储到Redis中String json = messageToJson(messages.get(messages.size() - 1));redisTemplate.opsForList().rightPush(getKey(sessionId), json);log.info("存数据到redis中 sessionId{}:json:{}", sessionId,json);// 开启延时任务// 封装实体类Map<String, Object> map = new HashMap<>();map.put("key", getKey(sessionId));map.put("num", messages.size());String jsonStr = JSONUtil.toJsonStr(map);dataDelayTaskHandler.addDelayedTask(jsonStr, 1, TimeUnit.MINUTES);} catch (Exception e) {log.error("更新对话历史失败", e);}}@Overridepublic void deleteMessages(Object sessionId) {try {redisTemplate.delete(getKey(sessionId));} catch (Exception e) {log.error("删除对话历史失败", e);}}
}

对应实体类:

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@Builder
@TableName("chat_session")
public class ChatSession {/*** 聊天记录id*/@TableId(value = "id", type =IdType.ASSIGN_ID)private Long id;/*** 用户ID*/private Long userId;/*** 会话ID*/private String sessionId;/*** 会话片段序号,从0开始*/private Integer segmentIndex;/*** 消息内容,JSON 格式,包含 role、type、text 等*/private String content;/*** 插入时间*/private LocalDateTime createTime;
}

延迟落库工具类:

@Slf4j
@Component
@RequiredArgsConstructor
public class DataDelayTaskHandler {private final RedissonClient redissonClient;private final StringRedisTemplate redisTemplate;private final IChatSessionService chatSessionService;private final IUserSessionService userSessionService;private static volatile boolean begin = true;@PostConstructpublic void init(){CompletableFuture.runAsync(this::handleDelayTask);CompletableFuture.runAsync(this::handleRetryTask);}@PreDestroypublic void destroy(){begin = false;log.debug("延迟任务停止执行!");}public void handleDelayTask() {RBlockingQueue<String> queue = redissonClient.getBlockingQueue(CHAT_DELAY_QUEUE);handleTask(queue);}public void handleRetryTask() {RBlockingQueue<String> retryQueue = redissonClient.getBlockingQueue(CHAT_RETRY_QUEUE);handleTask(retryQueue);}private void handleTask(RBlockingQueue<String> retryQueue) {while (begin) {String task = null;try {task = retryQueue.take();System.out.println("处理延迟任务:" + task);// 获取延迟队列中的 key 和 numJSONObject jsonObject = JSONUtil.parseObj(task);String key = jsonObject.getStr("key");Long num = jsonObject.getLong("num");// 去Redis中获取值Long size = redisTemplate.opsForList().size(key);// 如果Redis中队列的size > num,则说明用户还在聊天,则不处理if (size > num) {continue;}// size <= num,则说明用户已经结束聊天,则从Redis中队列中取出所有数据,并保存到数据库中// 查询Redis中的数据List<String> contentList = redisTemplate.opsForList().range(key, 0, -1);// 查询数据库中最后保存的数据String[] split = key.split(":");Integer count = userSessionService.lambdaQuery().eq(UserSession::getSessionId, split[3]).count();if(count==0){//这个会话已经被删除了,延迟同步不用做了continue;}List<ChatSession> lastContents = chatSessionService.lambdaQuery().eq(ChatSession::getUserId, split[2]).eq(ChatSession::getSessionId, split[3]).orderByDesc(ChatSession::getSegmentIndex).list(); // 查询所有匹配的数据ChatSession lastContent = !lastContents.isEmpty() ? lastContents.get(0) : null; // 取第一条数据或者int index = 0;if (ObjectUtil.isNotEmpty(lastContent)) {index = lastContent.getSegmentIndex();}List<ChatSession> chatSessionList = new ArrayList<>();for (int i = index + 1; i < contentList.size(); i++) {ChatSession chatSession = ChatSession.builder().userId(Long.valueOf(split[2])).sessionId(split[3]).segmentIndex(i).content(contentList.get(i)).createTime(LocalDateTime.now()).build();chatSessionList.add(chatSession);}chatSessionService.saveBatch(chatSessionList);redisTemplate.delete(key);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;} catch (Exception e) {// 可记录日志、发送告警、写入失败队列log.error("处理延迟任务失败,准备重试: {}", task, e);JSONObject taskJson = JSONUtil.parseObj(task);int retryCount = taskJson.getInt("retryCount", 0);if (retryCount < 3) {taskJson.set("retryCount", retryCount + 1);// 重试延迟 10 秒addRetryTask(taskJson.toString());log.info("任务 {} 重试第 {} 次", taskJson.getStr("key"), retryCount + 1);} else {log.error("任务最终失败,加入死信队列: {}", taskJson);redisTemplate.opsForList().rightPush("chat-dead-letter-queue", taskJson.toString());}}}}public void addDelayedTask(String task, long delay, TimeUnit unit) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(CHAT_DELAY_QUEUE);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(task, delay, unit);}private void addRetryTask(String task) {RBlockingQueue<String> retryBlockingQueue = redissonClient.getBlockingQueue(CHAT_RETRY_QUEUE);RDelayedQueue<String> retryDelayedQueue = redissonClient.getDelayedQueue(retryBlockingQueue);retryDelayedQueue.offer(task, 10, TimeUnit.SECONDS);}}

对于前端展示,我这里采用vue展示。如果是非流式处理我们就跟正常的http调用一样即可。而对于流式聊天的我们要使用@microsoft/fetch-event-source组件并且引入

因为如果流式处理的话是SSE连接,前端配置要复杂一点。

import { fetchEventSource } from '@microsoft/fetch-event-source';await fetchEventSource(`${AI_API_PREFIX}/chat/?${queryParams.toString()}`, {method: 'GET',headers: {'Accept': 'text/event-stream',"authorization": TOKEN},signal: abortController.value.signal,openWhenHidden: true, // 保持连接即使页面不可见onopen(response) {if (!response.ok || response.headers.get('content-type') !== 'text/event-stream') {throw new Error(`请求失败: ${response.status}`);}},onmessage(msg) {// 后端主动关闭时会发送[DONE]事件if (msg.data === '[DONE]') {assistantMessage.isTyping = false;isStreaming.value = false;abortController.value.abort(); // 主动关闭连接console.log('SSE 数据接收完成');return;}if (msg.data) {if (msg.data === '<think>') {inThinkingTag = true;} else if (msg.data === '</think>') {inThinkingTag = false;if (thinkingContent.trim() === '') {thinkingContent = '';}assistantMessage.thinkingContent = thinkingContent;thinkingContent = '';} else if (inThinkingTag) {thinkingContent += msg.data;} else {content += msg.data;const processed = processContent(content);assistantMessage.processedContent = processed.content;assistantMessage.showMarkdown = processed.showMarkdown;}scrollToBottom();}},onclose() {// 连接关闭时清理状态assistantMessage.isTyping = false;isStreaming.value = false;},onerror(err) {console.error('流式传输错误:', err);// 不自动重试assistantMessage.isTyping = false;isStreaming.value = false;if (abortController.value) {abortController.value.abort();}// 只有非主动中断的错误才显示if (err.name !== 'AbortError') {assistantMessage.content = '对话出错: ' + (err.message || '连接中断');}}});

SSE与Websocket区别:Websocket讲究客户端和服务端一起双向通信互相发送消息,而SSE只是服务端发送消息,客户端只接受。

这就是我的一个简单demo,详细代码可以查看:github.com里的tj-chat模块

http://www.xdnf.cn/news/13164.html

相关文章:

  • 【黑客与安全】Linux的常用命令之系统架构信息获取系列命令
  • 深入解析C#表达式求值:优先级、结合性与括号的魔法
  • Tauri2学习笔记
  • 带传动---
  • git: early EOF
  • 自我堕落公式证明法:你为谁而活
  • 火山 RTC 引擎11----集成创建房间、加入房间、销毁引擎 到互动项目中
  • [Java 基础]Object 类
  • 【题解-洛谷】B4292 [蓝桥杯青少年组省赛 2022] 路线
  • R语言速释制剂QBD解决方案之二
  • 网站指纹识别
  • 博图 SCL 编程技巧:灵活实现上升沿与下降沿检测案例分享(下)
  • 交通自动气象站的作用
  • [Linux] 命令行管理文件
  • 国产三维CAD皇冠CAD(CrownCAD)建模教程:压力变送器
  • 如何开发ONLYOFFICE协作空间插件:完整教程
  • AI高考志愿助手应用架构设计并上线实施运行
  • 使用python进行图像处理—图像变换(6)
  • 前端开发面试题总结-vue2框架篇(一)
  • MES系统如何解决电机制造业自动化生产管理?
  • 回溯算法学习
  • PCIe-8622工业级网卡特性解析
  • Linux中《基础IO》详细介绍
  • leetcode刷题经验
  • 云安全与网络安全:核心区别与协同作用解析
  • 统计学(第8版)——统计抽样学习笔记(考试用)
  • 使用 Python 正则表达式实现文本替换与电话号码规范化
  • 位运算求最大值的子集数目问题
  • Ace网络验证软件卡密系统-免费免搭建 记录整理
  • 如何让非 TCP/IP 协议驱动屏蔽 IPv4/IPv6 和 ARP 报文?