当前位置: 首页 > news >正文

MailAgentProcess.getInstance

MailAgentProcess.getInstance(app.isPackaged? path.join(resourcePath, 'email_agent').replace('app.asar', 'app.asar.unpacked'): undefined);

牵扯出一堆东西

创建 Mailspring/app/src/mail-agent-process.ts 对象

 constructor(executablePath?: string) {super();// 使用正确的路径设置,处理 Electron 打包后的环境this.executablePath = executablePath || path.join(__dirname, '..', 'email_agent');this.start();}

执行this.start()

//这是一个私有的异步方法,用于启动邮件代理进程
private async start() {// 标志设为 false,表示进程处于活动状态this._killed = false;//启动MCP(Model Context Protocol)服务器。获取返回的端口号和访问令牌。await 等待异步操作完成const { port, accessToken } = await this.startMcpServer();// 动态获取可用端口,使用20000-29999范围(避开常用端口)this._agentPort = await getPort({ port: portNumbers(20000, 29999) });// 更新API URLthis._baseUrl = `http://${this._agentHost}:${this._agentPort}`;// 更新API服务的Agent服务URLthis.updateApiServiceUrl();const config = {port: this._agentPort,'mcp-server': `http://localhost:${port}/mcp`,host: 'localhost', // 确保Agent服务绑定到localhost'app-data-path': app.getPath('userData'),mail_db: path.resolve(app.getPath('userData'), 'edgehill.db'),};const argsMap = Object.fromEntries(Object.entries(config).map(([key, value]) => [`--${key}`, String(value)]));const env = {...process.env,PYTHONPATH: '',PYTHONHOME: '',};const args = Object.entries(argsMap).flatMap(([key, value]) => [key, String(value)]);console.log('[MailAgentProcess] args', args);this.proc = spawn(this.executablePath, args, {stdio: ['pipe', 'pipe', 'pipe'],cwd: path.dirname(this.executablePath),env,});this.proc.on('close', (code, signal) => this.handleExit(code, signal));this.proc.on('error', err => this.emit('error', err));this.proc.stdout.on('data', data => {console.log('[MailAgentProcess] stdout', new TextDecoder().decode(data));});this.proc.stderr.on('data', data => {console.log('[MailAgentProcess] stderr', new TextDecoder().decode(data));});}

自习看看这个startMcpServer

private async startMcpServer() {// 获取10000-19999范围内的可用端口(避开常用端口)const port = await getPort({ port: portNumbers(10000, 19999) });const accessToken = uuidv4();//MCP 是一个用于大语言模型与工具系统之间通信的协议标准。//在 Mailspring 中,这个服务器为邮件代理提供结构化的邮件操作接口。startMcpServerProcess({port,//指定 MCP 服务器监听的端口号,MCP 服务器将在此端口提供 HTTP API 服务host: 'localhost',//指定服务器绑定的主机地址。只允许本地访问,防止外部网络攻击
//用于身份验证的访问令牌。确保只有授权的客户端可以访问 MCP 服务每次启动时重新//生成,提高安全性accessToken,configDirPath: '',//指定 MCP 服务器读取配置文件的目录resourcePath: '',//指定静态资源、模板等文件的位置verbose: true,//设置: true 表示输出详细日志transport: 'streamable-http',//指定 MCP 服务器的传输协议类型});return {port,accessToken,};}

transport值 

  • 可选值:
    • 'streamable-http': 流式HTTP传输(推荐)
    • 'sse': Server-Sent Events 传输
  • 优势:
    • streamable-http: 支持会话管理、可恢复性、更好的错误处理
    • sse: 简单的事件流,适用于单向通信

 

  • 服务器启动后的功能:

    1. 提供邮件操作API(发送、转发、回复、草稿管理)
    2. 会话管理和状态维护
    3. 统计信息收集
    4. 优雅关闭处理
/*** 快速启动 MCP 服务器的便捷函数** @param options 服务器配置选项* @returns Promise<void>*/
export const startMcpServerProcess = async (options: McpServerProcessOptions): Promise<void> => {const server = makeMcpServer(options);await server.listen();
};

/*** 创建并管理 MCP 服务器实例** @param options 服务器配置选项* @returns 服务器管理对象*/
export const makeMcpServer = (options: McpServerProcessOptions) => {const singleton = McpServerSingleton.getInstance(options);return {/*** 启动服务器*/listen: () => singleton.listen(),/*** 关闭服务器*/close: () => singleton.close(),/*** 重启服务器*/restart: () => singleton.restart(),/*** 检查运行状态*/isRunning: () => singleton.isRunning(),/*** 获取配置信息*/getConfig: () => singleton.getConfig(),/*** 获取统计信息*/getStats: () => singleton.getStats(),/*** 获取活跃会话*/getActiveSessions: () => singleton.getActiveSessions(),};
};

到了/Mailspring/app/src/mailsync-mcp-server-process.ts

private constructor(options: McpServerProcessOptions) {this.options = {port: 3001,host: '0.0.0.0',transport: 'streamable-http',verbose: false,...options,};console.log('Initializing Mailspring MCP Server Singleton...', this.options);}static getInstance(options: McpServerProcessOptions): McpServerSingleton {if (!McpServerSingleton.instance) {McpServerSingleton.instance = new McpServerSingleton(options);}return McpServerSingleton.instance;}/*** 启动 MCP 服务器*/async listen(): Promise<void> {if (this.serverInstance) {console.log('MCP Server is already running');return;}try {this.serverInstance = await startMailspringMCPServer({port: this.options.port,configDirPath: this.options.configDirPath,resourcePath: this.options.resourcePath,verbose: this.options.verbose,transport: this.options.transport,});// 设置进程信号处理this.setupSignalHandlers();} catch (error) {this.serverInstance = null;throw error;}}

先getInstance再执行constructor再执行listen

/Mailspring/app/src/mailsync-mcp-server/index.ts

/*** 运行 Streamable HTTP 传输服务器*/
export async function runStreamableHttpServer(port: number,config: {configDirPath: string;resourcePath: string;verbose?: boolean;}
) {const streamableServer = createStreamableHttpServer({port,mcpPath: '/mcp',createMCPServer: () => createMCPServerInstance(config),requireAccessToken: false, // 在生产环境中应启用enableSessionManagement: false,enableJsonResponse: false, // 优先使用 SSE 流式响应bindToLocalhost: true, // 安全:仅绑定到 localhostallowOrigins: ['*'], // 在生产环境中应限制允许的源});await streamableServer.listen();// 定期输出统计信息const statsInterval = setInterval(() => {const stats = streamableServer.getStats();const activeSessions = streamableServer.getActiveSessions();if (stats || activeSessions.length > 0) {}}, 60000); // 每分钟输出一次// 优雅关闭const cleanup = async () => {clearInterval(statsInterval);await streamableServer.close();};return {listen: () => Promise.resolve(),close: cleanup,server: streamableServer,getStats: () => streamableServer.getStats(),getActiveSessions: () => streamableServer.getActiveSessions(),};
}/*** 简化的服务器启动函数,用于进程管理*/
export async function startMailspringMCPServer(options: {port?: number;host?: string;configDirPath: string;resourcePath: string;verbose?: boolean;transport?: 'sse' | 'streamable-http';
}) {const {port = 3001,transport = 'streamable-http',configDirPath,resourcePath,verbose = false,} = options;const config = { configDirPath, resourcePath, verbose };try {let serverInstance;switch (transport) {case 'sse':serverInstance = await runSseServer(port, config);break;case 'streamable-http':serverInstance = await runStreamableHttpServer(port, config);break;default:throw new Error(`Unsupported transport: ${transport}`);}return {listen: serverInstance.listen,close: serverInstance.close,server: serverInstance.server,port,transport,};} catch (error) {throw error;}
}

执行    serverInstance = await runStreamableHttpServer(port, config);

createStreamableHttpServer是一个编译好的包里面的

function createStreamableHttpServer({port = 3001,mcpPath = "/mcp",requireAccessToken = false,getAccessToken = (req) => req.headers["authorization"]?.replace(/^Bearer\s+/i, ""),validateAccessToken = async () => true,createMCPServer,enableSessionManagement = true,allowOrigins = ["*"],bindToLocalhost = true,enableJsonResponse = false,eventStore,defaultSessionId = uuidv42()
} = {}) {if (!createMCPServer) {throw new Error("createMCPServer is required");}const app = express2();const defaultEventStore = eventStore || new InMemoryEventStore();app.use((req, res, next) => {const origin = req.headers.origin;if (bindToLocalhost && req.headers.host && !req.headers.host.includes("localhost") && !req.headers.host.includes("127.0.0.1")) {res.status(403).json({ error: "Invalid host header" });return;}if (allowOrigins.includes("*") || origin && allowOrigins.includes(origin)) {res.setHeader("Access-Control-Allow-Origin", origin || "*");}res.setHeader("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS");res.setHeader("Access-Control-Allow-Headers","Content-Type, Authorization, Accept, Mcp-Session-Id, Last-Event-ID");res.setHeader("Access-Control-Expose-Headers", "Mcp-Session-Id");if (req.method === "OPTIONS") {res.status(200).end();return;}next();});app.use(express2.json({ limit: "10mb" }));const authMiddleware = createAuthMiddleware({requireAccessToken,getAccessToken,validateAccessToken});app.use(authMiddleware);const { server, cleanup } = createMCPServer();const sessions = {};let httpServer;let sessionCleanupInterval;function getOrCreateTransport(req, res) {let sessionId = req.headers["mcp-session-id"];let isNew = false;console.log("prepare transport for sessionId:", sessionId);if (enableSessionManagement && sessionId && sessions[sessionId]) {sessions[sessionId].lastActivity = Date.now();return {sessionId,transport: sessions[sessionId].transport,isNew: false};}isNew = true;const transport = new StreamableHTTPServerTransport({sessionIdGenerator: enableSessionManagement ? () => {if (!sessionId) {sessionId = uuidv42();res.setHeader("Mcp-Session-Id", sessionId);isNew = true;}return sessionId;} : void 0,enableJsonResponse,eventStore: defaultEventStore,onsessioninitialized: (newSessionId) => {console.error(`Session initialized: ${newSessionId}`);sessionId = newSessionId;isNew = true;}});if (enableSessionManagement && sessionId) {sessions[sessionId] = {transport,lastActivity: Date.now()};}return { sessionId, transport, isNew };}function cleanupExpiredSessions() {const now = Date.now();const sessionTimeout = 30 * 60 * 1e3;Object.entries(sessions).forEach(([sessionId, session]) => {if (now - session.lastActivity > sessionTimeout) {console.error(`Cleaning up expired session: ${sessionId}`);if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];}});}app.get(mcpPath, async (req, res) => {try {console.error("Received GET request for SSE connection");if (!req.headers.accept?.includes("text/event-stream")) {res.status(405).json({error: "Method Not Allowed. Expected Accept: text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res);} catch (error) {console.error("Error handling GET request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});app.post(mcpPath, async (req, res) => {try {console.error("Received POST request");const acceptHeader = req.headers.accept;if (!acceptHeader?.includes("application/json") && !acceptHeader?.includes("text/event-stream")) {res.status(400).json({error: "Invalid Accept header. Expected application/json and/or text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res, req.body);} catch (error) {console.error("Error handling POST request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});app.delete(mcpPath, async (req, res) => {if (!enableSessionManagement) {res.status(405).json({ error: "Method Not Allowed. Session management disabled" });return;}const sessionId = req.headers["mcp-session-id"];if (!sessionId) {res.status(400).json({ error: "Missing Mcp-Session-Id header" });return;}if (sessions[sessionId]) {console.error(`Terminating session: ${sessionId}`);if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];res.status(204).end();} else {res.status(404).json({ error: "Session not found" });}});function listen() {return new Promise((resolve) => {const host = bindToLocalhost ? "127.0.0.1" : "0.0.0.0";httpServer = app.listen(port, host, () => {console.error(`Streamable HTTP Server is running on ${host}:${port}${mcpPath}`);console.error(`Security: Binding to ${bindToLocalhost ? "localhost only" : "all interfaces"}`);console.error(`Session management: ${enableSessionManagement ? "enabled" : "disabled"}`);console.error(`JSON response mode: ${enableJsonResponse ? "enabled" : "disabled (SSE preferred)"}`);resolve();});if (enableSessionManagement) {sessionCleanupInterval = setInterval(cleanupExpiredSessions,5 * 60 * 1e3);}});}async function close() {Object.keys(sessions).forEach((sessionId) => {if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];});if (sessionCleanupInterval) {clearInterval(sessionCleanupInterval);}await cleanup();if (httpServer) {await new Promise((resolve, reject) => {httpServer.close((err) => err ? reject(err) : resolve());});}}return {app,server,listen,close,getActiveSessions: () => Object.keys(sessions),getEventStore: () => defaultEventStore,getStats: () => defaultEventStore instanceof InMemoryEventStore ? defaultEventStore.getStats() : null};
}

创建了一个express2实例,然后返回实例,后面会在外面启动这个node服务,监听

所以我理解,这里的MCP服务就是在本地起了一个node后端服务,监听固定端口,然后这个本地服务接受到这个端口的请求。

app.get(mcpPath, async (req, res) => {try {console.error("Received GET request for SSE connection");if (!req.headers.accept?.includes("text/event-stream")) {res.status(405).json({error: "Method Not Allowed. Expected Accept: text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res);} catch (error) {console.error("Error handling GET request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});

这个 GET 请求是用于建立 **Server-Sent Events (SSE)** 连接的,它的主要作用是:

## 1. **建立实时通信连接**
- 这不是普通的 HTTP GET 请求,而是用于建立 SSE(Server-Sent Events)连接
- 客户端通过这个端点与服务器建立持久连接,用于实时接收服务器推送的数据

## 2. **验证请求类型**
```javascript
if (!req.headers.accept?.includes("text/event-stream")) {
res.status(405).json({
error: "Method Not Allowed. Expected Accept: text/event-stream"
});
return;
}
```
- 检查客户端是否正确请求了 `text/event-stream` 类型
- 这是 SSE 连接的标准 Accept 头

## 3. **管理传输层连接**
```javascript
const { transport, isNew } = getOrCreateTransport(req, res);
if (isNew) {
await server.connect(transport);
}
await transport.handleRequest(req, res);
```
- `getOrCreateTransport()` 获取或创建传输层对象
- 如果是新连接,将 MCP 服务器与传输层连接
- `transport.handleRequest()` 处理 SSE 连接的建立和维护

## 4. **为什么看起来"没有操作数据"?**

这个 GET 请求的作用是:
1. **连接建立**:为客户端建立持久的 SSE 连接
2. **传输准备**:初始化 MCP(Model Context Protocol)服务器的传输层
3. **等待交互**:连接建立后,真正的数据交互通过 POST 请求或 SSE 事件流进行

实际的数据操作会在:
- **POST 请求**:客户端发送具体的 MCP 命令(如调用工具、获取提示等)
- **SSE 事件流**:服务器主动推送数据给客户端

这是一个典型的 **双向通信模式**:
- GET 请求建立 SSE 连接(服务器到客户端的单向流)
- POST 请求发送命令(客户端到服务器)
- 结合起来实现双向实时通信

所以这个 GET 请求虽然看起来简单,但它是整个实时通信架构的基础连接层。

http://www.xdnf.cn/news/1137475.html

相关文章:

  • API开发提速新方案:SmartBear API Hub与ReadyAPI虚拟化整合实践
  • 如何在PyCharm中切换其他虚拟环境
  • OCR 赋能档案数字化:让沉睡的档案 “活” 起来
  • web后端开发(javaweb第十天)
  • yolo8+ASR+NLP+TTS(视觉语音助手)
  • 算法提升之字符串练习-02(字符串哈希)
  • 小红书获取关键词列表API接口详解
  • MongoDB 与MySQL 及es的区别
  • AllDup(重复文件查找)v4.5.70 便携版
  • 基于MATLAB和ZEMAX的光学传递函数与调制传递函数联合仿真
  • 初试Spring AI实现聊天功能
  • mysql——搭建MGR集群
  • 分布式分片策略中,分片数量的评估与选择
  • 基于单片机公交车报站系统/报站器
  • Jenkins Git Parameter 分支不显示前缀origin/或repo/
  • 2024年ASOC SCI2区TOP,基于干扰模型的灰狼优化算法IIE-GWO+复杂丘陵地形农业无人机轨迹规划,深度解析+性能实测
  • 医院各类不良事件上报,PHP+vscode+vue2+element+laravel8+mysql5.7不良事件管理系统源代码,成品源码,不良事件管理系统
  • 板凳-------Mysql cookbook学习 (十一--------12)
  • Python22 —— 标准库(random库)
  • Linux的Ext系列文件系统
  • 【JVM】深入理解 JVM 类加载器
  • 【推荐100个unity插件】使用C#或者unity实现爬虫爬取静态网页数据——Html Agility Pack (HAP)库和XPath 语法的使用
  • Java学习--JVM(2)
  • 学习C++、QT---27(QT中实现记事本项目实现行列显示、优化保存文件的功能的讲解)
  • 【Linux手册】缓冲区:深入浅出,从核心概念到实现逻辑
  • 数据结构:集合操作(Set Operations): 并集(Union)、交集(Intersection)、 差集(Difference)
  • 【37】MFC入门到精通——MFC中 CString 数字字符串 转 WORD ( CString, WORD/int 互转)
  • 编译原理第六到七章(知识点学习/期末复习/笔试/面试)
  • 【真·CPU训模型!】单颗i7家用本,4天0成本跑通中文小模型训练!Xiaothink-T6-mini-Preview 技术预览版开源发布!
  • 数据投毒技术之标签翻转