当前位置: 首页 > ops >正文

Spring AI 多模型智能协作工作流实现指南

Spring AI 多模型智能协作工作流实现指南

说明

本文档旨在指导开发者基于 Spring AI 框架,在 Spring Boot 2 环境下集成多种主流大语言模型(如 OpenAI ChatGPT、Deepseek、阿里云通义千问等),并提供从环境配置、模型调用、流式输出到提示模板与异常处理的完整使用示例。文中示例适配 Spring AI 进行开发。本教程适用于对 LLM 应用开发有一定基础的 Java 工程师,亦可作为企业多模型接入与管理的实现参考。

1. 系统架构概述

本方案基于 Spring AI 框架,构建了一个多模型协作的工作流系统,通过 ChatGPT、Deepseek 和通义千问三大模型的优势互补,实现从原始输入到高质量输出的完整处理流程。

以下是基于Mermaid语法的工作流架构图,您可以直接复制到支持Mermaid的Markdown编辑器(如Typora、VS Code插件等)中查看:

异常处理
模型协作工作流
成功
失败
JSON格式
生成草稿
降级处理
使用默认关键词
解析JSON结果
ChatGPT关键词提取
通义千问文本润色
Deepseek内容生成
用户输入文本
最终输出结果

架构图说明:

  1. 流程图元素

    • 矩形框:表示处理步骤
    • 菱形框:表示判断/解析节点
    • 子图:标识不同处理阶段
    • 样式:区分输入/输出节点
  2. 工作流步骤

    A
    B
    C
    D
    F
    G
    E
  3. 带详细说明的版本

原始文本
JSON格式
成功
失败
初稿内容
用户输入
ChatGPT处理器
解析结果?
提取关键词+意图
使用默认值
Deepseek生成器
通义千问润色器
最终输出

关键路径说明:

  1. 正常流程

    用户输入 → ChatGPT提取 → 解析JSON → Deepseek生成 → 通义千问润色 → 输出
    
  2. 异常流程

    解析失败 → 使用默认关键词 → 继续后续流程
    
  3. 组件职责

    • ChatGPT:结构化理解输入文本
    • Deepseek:基于结构化数据生成内容
    • 通义千问:优化表达质量

部署架构图(补充)

云服务
HTTP请求
API调用
API调用
API调用
OpenAI Cloud
Deepseek API
阿里云通义千问
客户端
Spring Boot API
Redis缓存
数据库

2. 环境配置

2.1 Maven 依赖配置

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><!-- Spring AI Core --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-core</artifactId><version>0.8.1</version></dependency><!-- OpenAI Starter --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-openai-spring-boot-starter</artifactId><version>0.8.1</version></dependency><!-- 阿里云通义千问 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter</artifactId><version>1.0.0-M2</version></dependency><!-- JSON 处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 日志记录 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency>
</dependencies>

2.2 应用配置 (application.yml)

spring:ai:openai:api-key: ${OPENAI_API_KEY}model: gpt-4-turbotemperature: 0.7max-tokens: 1000alibaba:dashscope:api-key: ${TONGYI_API_KEY}model: qwen-maxtemperature: 0.8deepseek:api-key: ${DEEPSEEK_API_KEY}base-url: https://api.deepseek.com/v1model: deepseek-chatapp:workflow:max-retries: 3retry-delay: 1000timeout: 30000

3. 核心组件实现

3.1 模型任务定义

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ModelTask {private String taskId;private TaskType type;private String input;private Map<String, Object> params;public enum TaskType {KEYWORD_EXTRACTION, // 关键词提取CONTENT_GENERATION, // 内容生成TEXT_POLISHING      // 文本润色}
}

3.2 模型执行器接口

public interface ModelExecutor {/*** 执行模型任务* @param task 模型任务* @return 执行结果* @throws ModelExecutionException 模型执行异常*/String execute(ModelTask task) throws ModelExecutionException;/*** 支持的模型类型*/ModelTask.TaskType supportedType();
}

3.3 ChatGPT 执行器实现 (关键词提取)

@Service
@Slf4j
public class ChatGPTExecutor implements ModelExecutor {@Autowiredprivate OpenAiChatClient chatClient;@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {String prompt = buildExtractionPrompt(task.getInput());log.info("Executing ChatGPT task with prompt: {}", prompt);String response = chatClient.call(prompt);log.info("ChatGPT response: {}", response);return parseJsonResponse(response);} catch (Exception e) {log.error("ChatGPT execution failed", e);throw new ModelExecutionException("ChatGPT processing failed", e);}}private String buildExtractionPrompt(String input) {return """请从以下文本中提取结构化信息,按JSON格式返回:{"keywords": ["关键词1", "关键词2", ...],  // 5-10个核心关键词"intent": "用户意图描述",              // 用户的主要目的"sentiment": "positive/neutral/negative" // 情感倾向}输入文本:""" + input;}private String parseJsonResponse(String response) {// 简化的JSON解析,实际项目中应使用Jackson等库if (response.startsWith("{") && response.endsWith("}")) {return response;}// 处理非标准JSON响应return response.replaceFirst(".*?(\\{.*\\}).*", "$1");}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.KEYWORD_EXTRACTION;}
}

3.4 Deepseek 执行器实现 (内容生成)

@Service
@Slf4j
public class DeepseekExecutor implements ModelExecutor {@Value("${deepseek.api-key}")private String apiKey;@Value("${deepseek.base-url}")private String baseUrl;@Value("${deepseek.model}")private String model;private final RestClient restClient;public DeepseekExecutor() {this.restClient = RestClient.builder().baseUrl(baseUrl).defaultHeader("Authorization", "Bearer " + apiKey).build();}@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {Map<String, Object> requestBody = buildRequest(task);log.info("Sending request to Deepseek: {}", requestBody);ResponseEntity<Map> response = restClient.post().uri("/chat/completions").body(requestBody).retrieve().toEntity(Map.class);return extractContent(response.getBody());} catch (Exception e) {log.error("Deepseek API call failed", e);throw new ModelExecutionException("Deepseek processing failed", e);}}private Map<String, Object> buildRequest(ModelTask task) {return Map.of("model", model,"messages", List.of(Map.of("role", "system", "content", "你是一个专业的内容生成助手"),Map.of("role", "user", "content", task.getInput())),"temperature", 0.7,"max_tokens", 2000);}private String extractContent(Map<String, Object> response) {// 简化处理,实际项目需要更健壮的解析return ((Map)((List)response.get("choices")).get(0)).get("message").toString();}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.CONTENT_GENERATION;}
}

3.5 通义千问执行器实现 (文本润色)

@Service
@Slf4j
public class TongyiExecutor implements ModelExecutor {@Autowiredprivate TongyiChatClient chatClient;@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {String polishPrompt = buildPolishPrompt(task.getInput());log.info("Executing Tongyi polishing with prompt: {}", polishPrompt);String response = chatClient.call(polishPrompt);log.info("Tongyi polished response: {}", response);return response;} catch (Exception e) {log.error("Tongyi polishing failed", e);throw new ModelExecutionException("Tongyi polishing failed", e);}}private String buildPolishPrompt(String input) {return """请对以下文本进行专业的中文润色,要求:1. 保持原意不变2. 优化表达流畅度3. 使用更专业的词汇4. 适当调整句式结构5. 确保语法正确需要润色的文本:""" + input;}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.TEXT_POLISHING;}
}

4. 工作流协调器

@Service
@Slf4j
public class WorkflowCoordinator {@Autowiredprivate List<ModelExecutor> executors;@Value("${app.workflow.max-retries}")private int maxRetries;@Value("${app.workflow.retry-delay}")private long retryDelay;@Value("${app.workflow.timeout}")private long timeout;private final Map<ModelTask.TaskType, ModelExecutor> executorMap = new HashMap<>();@PostConstructpublic void init() {executors.forEach(executor -> executorMap.put(executor.supportedType(), executor));}@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))public String processWorkflow(String userInput) throws WorkflowException {long startTime = System.currentTimeMillis();String taskId = UUID.randomUUID().toString();try {// 阶段1: 关键词提取ModelTask extractionTask = new ModelTask(taskId, ModelTask.TaskType.KEYWORD_EXTRACTION,userInput,Map.of("format", "json"));String extractionResult = executeWithTimeout(executorMap.get(ModelTask.TaskType.KEYWORD_EXTRACTION),extractionTask);// 阶段2: 内容生成String generationPrompt = buildGenerationPrompt(extractionResult);ModelTask generationTask = new ModelTask(taskId,ModelTask.TaskType.CONTENT_GENERATION,generationPrompt,Map.of("length", "medium"));String generatedContent = executeWithTimeout(executorMap.get(ModelTask.TaskType.CONTENT_GENERATION),generationTask);// 阶段3: 文本润色ModelTask polishingTask = new ModelTask(taskId,ModelTask.TaskType.TEXT_POLISHING,generatedContent,Map.of("style", "professional"));String finalResult = executeWithTimeout(executorMap.get(ModelTask.TaskType.TEXT_POLISHING),polishingTask);log.info("Workflow completed in {} ms", System.currentTimeMillis() - startTime);return finalResult;} catch (TimeoutException e) {throw new WorkflowException("Workflow timed out", e);} catch (Exception e) {throw new WorkflowException("Workflow execution failed", e);}}private String executeWithTimeout(ModelExecutor executor, ModelTask task) throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();Future<String> future = executorService.submit(() -> executor.execute(task));try {return future.get(timeout, TimeUnit.MILLISECONDS);} finally {executorService.shutdown();}}private String buildGenerationPrompt(String extractionResult) {try {JSONObject json = new JSONObject(extractionResult);String keywords = String.join(", ", json.getJSONArray("keywords").toList());String intent = json.getString("intent");return String.format("根据以下关键词和意图生成专业内容:\n关键词: %s\n意图: %s", keywords, intent);} catch (Exception e) {log.warn("Failed to parse extraction result, using raw input");return "根据以下信息生成专业内容:\n" + extractionResult;}}
}

5. REST 控制器

@RestController
@RequestMapping("/api/ai-workflow")
@Slf4j
public class WorkflowController {@Autowiredprivate WorkflowCoordinator workflowCoordinator;@PostMapping("/process")public ResponseEntity<?> processInput(@RequestBody String userInput) {try {log.info("Received processing request: {}", userInput);String result = workflowCoordinator.processWorkflow(userInput);return ResponseEntity.ok(result);} catch (WorkflowException e) {log.error("Workflow processing error", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", "Processing failed","message", e.getMessage()));}}@GetMapping("/status")public ResponseEntity<?> getStatus() {return ResponseEntity.ok(Map.of("status", "operational","models", List.of("ChatGPT", "Deepseek", "Tongyi")));}
}

6. 异常处理

@ControllerAdvice
@Slf4j
public class GlobalExceptionHandler {@ExceptionHandler(WorkflowException.class)public ResponseEntity<?> handleWorkflowException(WorkflowException ex) {log.error("Workflow error occurred: {}", ex.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", "AI Workflow Error","message", ex.getMessage(),"timestamp", Instant.now()));}@ExceptionHandler(ModelExecutionException.class)public ResponseEntity<?> handleModelException(ModelExecutionException ex) {log.error("Model execution failed: {}", ex.getMessage());return ResponseEntity.status(HttpStatus.BAD_GATEWAY).body(Map.of("error", "Model Execution Error","message", "One of the AI models failed to process the request","timestamp", Instant.now()));}
}

7. 测试用例

@SpringBootTest
@ActiveProfiles("test")
@Slf4j
public class WorkflowIntegrationTest {@Autowiredprivate WorkflowCoordinator workflowCoordinator;@Testpublic void testFullWorkflow() {String input = "用户反馈:新版App的语音识别功能很准确,但耗电量较大,希望优化电池使用效率";try {String result = workflowCoordinator.processWorkflow(input);log.info("Workflow result:\n{}", result);assertNotNull(result);assertTrue(result.length() > 50);assertTrue(result.contains("语音识别") || result.contains("耗电"));} catch (WorkflowException e) {fail("Workflow execution failed: " + e.getMessage());}}@Testpublic void testTimeoutHandling() {// 模拟超时场景的测试assertThrows(WorkflowException.class, () -> {workflowCoordinator.processWorkflow("test timeout");});}
}

8. 部署与监控建议

8.1 Prometheus 监控配置

# application.yml 添加监控配置
management:endpoints:web:exposure:include: health,info,metrics,prometheusmetrics:export:prometheus:enabled: truetags:application: ai-workflow

8.2 关键指标监控

  1. 工作流执行时间
  2. 各模型调用成功率
  3. API响应时间
  4. 错误率统计
  5. Token使用量

9. 性能优化建议

  1. 缓存层:对常见查询结果实现缓存

    @Cacheable(value = "aiResponses", key = "#userInput.hashCode()")
    public String processWorkflow(String userInput) { ... }
    
  2. 批量处理:支持批量输入处理

  3. 异步处理:对耗时任务实现异步API

    @Async
    public CompletableFuture<String> processAsync(String input) { ... }
    
  4. 连接池配置:优化HTTP连接池

    spring:ai:openai:rest:connection-timeout: 5000read-timeout: 30000max-connections: 100
    

10. 安全建议

  1. API密钥轮换:定期更新各模型API密钥

  2. 输入过滤:防止Prompt注入攻击

    public String sanitizeInput(String input) {return input.replaceAll("[<>\"']", "");
    }
    
  3. 访问控制:实现API访问认证

  4. 请求限流:防止滥用

总结

本实现方案展示了如何利用 Spring AI 框架构建一个多模型协作的智能工作流系统,具有以下特点:

  1. 模块化设计:各模型执行器相互独立,易于扩展
  2. 弹性处理:完善的错误处理和重试机制
  3. 性能可控:超时管理和异步支持
  4. 可观测性:完善的日志和监控支持
  5. 生产就绪:包含安全、缓存等生产环境所需功能

通过这种架构,您可以灵活地替换或添加新的AI模型,同时保持业务逻辑的一致性,为构建企业级AI应用提供了可靠的基础框架。

http://www.xdnf.cn/news/9535.html

相关文章:

  • AI Agent开发第76课-Dify N8n一类的AI流程“出轨“时会爆发什么样的工程灾难
  • 用python制作一个打地鼠游戏
  • 主要国产数据库及其典型应用场景
  • 每天掌握一个Linux命令 - ps
  • 多因素身份鉴别组合方案及应用场景
  • MySQL----视图的创造和使用
  • 篇章六 数据结构——链表(二)
  • 某标杆房企BI平台2.0升级实践
  • 系统思考:心智模式与业务创新
  • LiveGBS海康、大华、宇视、华为摄像头GB28181国标语音对讲及语音喊话:摄像头设备与服务HTTPS准备
  • 工业总线的“F1赛车“与“越野车“:从控制周期解读EtherCAT与CANopen
  • 镍钯金PCB为什么很难做?
  • 伽罗华域(galois field)的乘法计算(异或法)
  • 前后端传输 Long 类型数据时(时间戳,雪花算法ID),精度丢失的根本原因
  • JavaSE核心知识点04工具
  • WebFuture:后台离开站点提示设置关闭后无效
  • 基于Matlab实现指纹识别系统
  • 一招解决 win10 安装 Abobe PR/AE 打不开或闪退
  • 如何在 Solana 上发币,并创建初始流动性让项目真正“动”起来?
  • 12.Java 对象冷冻术:从用户登录到游戏存档的序列化实战
  • 电子电路:开关电路技术深度解析
  • Ubuntu 24.04 LTS 和 ROS 2 Jazzy 环境中使用 Livox MID360 雷达
  • 2025年软件测试面试八股文(含答案+文档)
  • indel_snp_ssr_primer
  • 简历中项目经历怎么写?
  • 硬件服务器基础
  • C++11:系统类型增强
  • ‌ATR2652S双频GNSS低噪声放大器芯片技术解析
  • unityPc端设置了全屏(Exclusive Fullscreen)但是仍然有白边解决办法
  • 在 Linux 中让 ​​Gunicorn​​ 在后台运行(作为守护进程),有几种常用方法: