MailAgentProcess.getInstance
MailAgentProcess.getInstance(app.isPackaged? path.join(resourcePath, 'email_agent').replace('app.asar', 'app.asar.unpacked'): undefined);
牵扯出一堆东西
创建 Mailspring/app/src/mail-agent-process.ts 对象
constructor(executablePath?: string) {super();// 使用正确的路径设置,处理 Electron 打包后的环境this.executablePath = executablePath || path.join(__dirname, '..', 'email_agent');this.start();}
执行this.start()
//这是一个私有的异步方法,用于启动邮件代理进程
private async start() {// 标志设为 false,表示进程处于活动状态this._killed = false;//启动MCP(Model Context Protocol)服务器。获取返回的端口号和访问令牌。await 等待异步操作完成const { port, accessToken } = await this.startMcpServer();// 动态获取可用端口,使用20000-29999范围(避开常用端口)this._agentPort = await getPort({ port: portNumbers(20000, 29999) });// 更新API URLthis._baseUrl = `http://${this._agentHost}:${this._agentPort}`;// 更新API服务的Agent服务URLthis.updateApiServiceUrl();const config = {port: this._agentPort,'mcp-server': `http://localhost:${port}/mcp`,host: 'localhost', // 确保Agent服务绑定到localhost'app-data-path': app.getPath('userData'),mail_db: path.resolve(app.getPath('userData'), 'edgehill.db'),};const argsMap = Object.fromEntries(Object.entries(config).map(([key, value]) => [`--${key}`, String(value)]));const env = {...process.env,PYTHONPATH: '',PYTHONHOME: '',};const args = Object.entries(argsMap).flatMap(([key, value]) => [key, String(value)]);console.log('[MailAgentProcess] args', args);this.proc = spawn(this.executablePath, args, {stdio: ['pipe', 'pipe', 'pipe'],cwd: path.dirname(this.executablePath),env,});this.proc.on('close', (code, signal) => this.handleExit(code, signal));this.proc.on('error', err => this.emit('error', err));this.proc.stdout.on('data', data => {console.log('[MailAgentProcess] stdout', new TextDecoder().decode(data));});this.proc.stderr.on('data', data => {console.log('[MailAgentProcess] stderr', new TextDecoder().decode(data));});}
自习看看这个startMcpServer
private async startMcpServer() {// 获取10000-19999范围内的可用端口(避开常用端口)const port = await getPort({ port: portNumbers(10000, 19999) });const accessToken = uuidv4();//MCP 是一个用于大语言模型与工具系统之间通信的协议标准。//在 Mailspring 中,这个服务器为邮件代理提供结构化的邮件操作接口。startMcpServerProcess({port,//指定 MCP 服务器监听的端口号,MCP 服务器将在此端口提供 HTTP API 服务host: 'localhost',//指定服务器绑定的主机地址。只允许本地访问,防止外部网络攻击
//用于身份验证的访问令牌。确保只有授权的客户端可以访问 MCP 服务每次启动时重新//生成,提高安全性accessToken,configDirPath: '',//指定 MCP 服务器读取配置文件的目录resourcePath: '',//指定静态资源、模板等文件的位置verbose: true,//设置: true 表示输出详细日志transport: 'streamable-http',//指定 MCP 服务器的传输协议类型});return {port,accessToken,};}
transport值
- 可选值:
'streamable-http'
: 流式HTTP传输(推荐)'sse'
: Server-Sent Events 传输
- 优势:
streamable-http
: 支持会话管理、可恢复性、更好的错误处理sse
: 简单的事件流,适用于单向通信
服务器启动后的功能:
- 提供邮件操作API(发送、转发、回复、草稿管理)
- 会话管理和状态维护
- 统计信息收集
- 优雅关闭处理
/*** 快速启动 MCP 服务器的便捷函数** @param options 服务器配置选项* @returns Promise<void>*/
export const startMcpServerProcess = async (options: McpServerProcessOptions): Promise<void> => {const server = makeMcpServer(options);await server.listen();
};
/*** 创建并管理 MCP 服务器实例** @param options 服务器配置选项* @returns 服务器管理对象*/
export const makeMcpServer = (options: McpServerProcessOptions) => {const singleton = McpServerSingleton.getInstance(options);return {/*** 启动服务器*/listen: () => singleton.listen(),/*** 关闭服务器*/close: () => singleton.close(),/*** 重启服务器*/restart: () => singleton.restart(),/*** 检查运行状态*/isRunning: () => singleton.isRunning(),/*** 获取配置信息*/getConfig: () => singleton.getConfig(),/*** 获取统计信息*/getStats: () => singleton.getStats(),/*** 获取活跃会话*/getActiveSessions: () => singleton.getActiveSessions(),};
};
到了/Mailspring/app/src/mailsync-mcp-server-process.ts
private constructor(options: McpServerProcessOptions) {this.options = {port: 3001,host: '0.0.0.0',transport: 'streamable-http',verbose: false,...options,};console.log('Initializing Mailspring MCP Server Singleton...', this.options);}static getInstance(options: McpServerProcessOptions): McpServerSingleton {if (!McpServerSingleton.instance) {McpServerSingleton.instance = new McpServerSingleton(options);}return McpServerSingleton.instance;}/*** 启动 MCP 服务器*/async listen(): Promise<void> {if (this.serverInstance) {console.log('MCP Server is already running');return;}try {this.serverInstance = await startMailspringMCPServer({port: this.options.port,configDirPath: this.options.configDirPath,resourcePath: this.options.resourcePath,verbose: this.options.verbose,transport: this.options.transport,});// 设置进程信号处理this.setupSignalHandlers();} catch (error) {this.serverInstance = null;throw error;}}
先getInstance再执行constructor再执行listen
/Mailspring/app/src/mailsync-mcp-server/index.ts
/*** 运行 Streamable HTTP 传输服务器*/
export async function runStreamableHttpServer(port: number,config: {configDirPath: string;resourcePath: string;verbose?: boolean;}
) {const streamableServer = createStreamableHttpServer({port,mcpPath: '/mcp',createMCPServer: () => createMCPServerInstance(config),requireAccessToken: false, // 在生产环境中应启用enableSessionManagement: false,enableJsonResponse: false, // 优先使用 SSE 流式响应bindToLocalhost: true, // 安全:仅绑定到 localhostallowOrigins: ['*'], // 在生产环境中应限制允许的源});await streamableServer.listen();// 定期输出统计信息const statsInterval = setInterval(() => {const stats = streamableServer.getStats();const activeSessions = streamableServer.getActiveSessions();if (stats || activeSessions.length > 0) {}}, 60000); // 每分钟输出一次// 优雅关闭const cleanup = async () => {clearInterval(statsInterval);await streamableServer.close();};return {listen: () => Promise.resolve(),close: cleanup,server: streamableServer,getStats: () => streamableServer.getStats(),getActiveSessions: () => streamableServer.getActiveSessions(),};
}/*** 简化的服务器启动函数,用于进程管理*/
export async function startMailspringMCPServer(options: {port?: number;host?: string;configDirPath: string;resourcePath: string;verbose?: boolean;transport?: 'sse' | 'streamable-http';
}) {const {port = 3001,transport = 'streamable-http',configDirPath,resourcePath,verbose = false,} = options;const config = { configDirPath, resourcePath, verbose };try {let serverInstance;switch (transport) {case 'sse':serverInstance = await runSseServer(port, config);break;case 'streamable-http':serverInstance = await runStreamableHttpServer(port, config);break;default:throw new Error(`Unsupported transport: ${transport}`);}return {listen: serverInstance.listen,close: serverInstance.close,server: serverInstance.server,port,transport,};} catch (error) {throw error;}
}
执行 serverInstance = await runStreamableHttpServer(port, config);
createStreamableHttpServer是一个编译好的包里面的
function createStreamableHttpServer({port = 3001,mcpPath = "/mcp",requireAccessToken = false,getAccessToken = (req) => req.headers["authorization"]?.replace(/^Bearer\s+/i, ""),validateAccessToken = async () => true,createMCPServer,enableSessionManagement = true,allowOrigins = ["*"],bindToLocalhost = true,enableJsonResponse = false,eventStore,defaultSessionId = uuidv42()
} = {}) {if (!createMCPServer) {throw new Error("createMCPServer is required");}const app = express2();const defaultEventStore = eventStore || new InMemoryEventStore();app.use((req, res, next) => {const origin = req.headers.origin;if (bindToLocalhost && req.headers.host && !req.headers.host.includes("localhost") && !req.headers.host.includes("127.0.0.1")) {res.status(403).json({ error: "Invalid host header" });return;}if (allowOrigins.includes("*") || origin && allowOrigins.includes(origin)) {res.setHeader("Access-Control-Allow-Origin", origin || "*");}res.setHeader("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS");res.setHeader("Access-Control-Allow-Headers","Content-Type, Authorization, Accept, Mcp-Session-Id, Last-Event-ID");res.setHeader("Access-Control-Expose-Headers", "Mcp-Session-Id");if (req.method === "OPTIONS") {res.status(200).end();return;}next();});app.use(express2.json({ limit: "10mb" }));const authMiddleware = createAuthMiddleware({requireAccessToken,getAccessToken,validateAccessToken});app.use(authMiddleware);const { server, cleanup } = createMCPServer();const sessions = {};let httpServer;let sessionCleanupInterval;function getOrCreateTransport(req, res) {let sessionId = req.headers["mcp-session-id"];let isNew = false;console.log("prepare transport for sessionId:", sessionId);if (enableSessionManagement && sessionId && sessions[sessionId]) {sessions[sessionId].lastActivity = Date.now();return {sessionId,transport: sessions[sessionId].transport,isNew: false};}isNew = true;const transport = new StreamableHTTPServerTransport({sessionIdGenerator: enableSessionManagement ? () => {if (!sessionId) {sessionId = uuidv42();res.setHeader("Mcp-Session-Id", sessionId);isNew = true;}return sessionId;} : void 0,enableJsonResponse,eventStore: defaultEventStore,onsessioninitialized: (newSessionId) => {console.error(`Session initialized: ${newSessionId}`);sessionId = newSessionId;isNew = true;}});if (enableSessionManagement && sessionId) {sessions[sessionId] = {transport,lastActivity: Date.now()};}return { sessionId, transport, isNew };}function cleanupExpiredSessions() {const now = Date.now();const sessionTimeout = 30 * 60 * 1e3;Object.entries(sessions).forEach(([sessionId, session]) => {if (now - session.lastActivity > sessionTimeout) {console.error(`Cleaning up expired session: ${sessionId}`);if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];}});}app.get(mcpPath, async (req, res) => {try {console.error("Received GET request for SSE connection");if (!req.headers.accept?.includes("text/event-stream")) {res.status(405).json({error: "Method Not Allowed. Expected Accept: text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res);} catch (error) {console.error("Error handling GET request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});app.post(mcpPath, async (req, res) => {try {console.error("Received POST request");const acceptHeader = req.headers.accept;if (!acceptHeader?.includes("application/json") && !acceptHeader?.includes("text/event-stream")) {res.status(400).json({error: "Invalid Accept header. Expected application/json and/or text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res, req.body);} catch (error) {console.error("Error handling POST request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});app.delete(mcpPath, async (req, res) => {if (!enableSessionManagement) {res.status(405).json({ error: "Method Not Allowed. Session management disabled" });return;}const sessionId = req.headers["mcp-session-id"];if (!sessionId) {res.status(400).json({ error: "Missing Mcp-Session-Id header" });return;}if (sessions[sessionId]) {console.error(`Terminating session: ${sessionId}`);if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];res.status(204).end();} else {res.status(404).json({ error: "Session not found" });}});function listen() {return new Promise((resolve) => {const host = bindToLocalhost ? "127.0.0.1" : "0.0.0.0";httpServer = app.listen(port, host, () => {console.error(`Streamable HTTP Server is running on ${host}:${port}${mcpPath}`);console.error(`Security: Binding to ${bindToLocalhost ? "localhost only" : "all interfaces"}`);console.error(`Session management: ${enableSessionManagement ? "enabled" : "disabled"}`);console.error(`JSON response mode: ${enableJsonResponse ? "enabled" : "disabled (SSE preferred)"}`);resolve();});if (enableSessionManagement) {sessionCleanupInterval = setInterval(cleanupExpiredSessions,5 * 60 * 1e3);}});}async function close() {Object.keys(sessions).forEach((sessionId) => {if (defaultEventStore instanceof InMemoryEventStore) {defaultEventStore.clearStream(sessionId);}delete sessions[sessionId];});if (sessionCleanupInterval) {clearInterval(sessionCleanupInterval);}await cleanup();if (httpServer) {await new Promise((resolve, reject) => {httpServer.close((err) => err ? reject(err) : resolve());});}}return {app,server,listen,close,getActiveSessions: () => Object.keys(sessions),getEventStore: () => defaultEventStore,getStats: () => defaultEventStore instanceof InMemoryEventStore ? defaultEventStore.getStats() : null};
}
创建了一个express2实例,然后返回实例,后面会在外面启动这个node服务,监听
所以我理解,这里的MCP服务就是在本地起了一个node后端服务,监听固定端口,然后这个本地服务接受到这个端口的请求。
app.get(mcpPath, async (req, res) => {try {console.error("Received GET request for SSE connection");if (!req.headers.accept?.includes("text/event-stream")) {res.status(405).json({error: "Method Not Allowed. Expected Accept: text/event-stream"});return;}const { transport, isNew } = getOrCreateTransport(req, res);if (isNew) {await server.connect(transport);}await transport.handleRequest(req, res);} catch (error) {console.error("Error handling GET request:", error);if (!res.headersSent) {res.status(500).json({ error: "Internal server error" });}}});
这个 GET 请求是用于建立 **Server-Sent Events (SSE)** 连接的,它的主要作用是:
## 1. **建立实时通信连接**
- 这不是普通的 HTTP GET 请求,而是用于建立 SSE(Server-Sent Events)连接
- 客户端通过这个端点与服务器建立持久连接,用于实时接收服务器推送的数据
## 2. **验证请求类型**
```javascript
if (!req.headers.accept?.includes("text/event-stream")) {
res.status(405).json({
error: "Method Not Allowed. Expected Accept: text/event-stream"
});
return;
}
```
- 检查客户端是否正确请求了 `text/event-stream` 类型
- 这是 SSE 连接的标准 Accept 头
## 3. **管理传输层连接**
```javascript
const { transport, isNew } = getOrCreateTransport(req, res);
if (isNew) {
await server.connect(transport);
}
await transport.handleRequest(req, res);
```
- `getOrCreateTransport()` 获取或创建传输层对象
- 如果是新连接,将 MCP 服务器与传输层连接
- `transport.handleRequest()` 处理 SSE 连接的建立和维护
## 4. **为什么看起来"没有操作数据"?**
这个 GET 请求的作用是:
1. **连接建立**:为客户端建立持久的 SSE 连接
2. **传输准备**:初始化 MCP(Model Context Protocol)服务器的传输层
3. **等待交互**:连接建立后,真正的数据交互通过 POST 请求或 SSE 事件流进行
实际的数据操作会在:
- **POST 请求**:客户端发送具体的 MCP 命令(如调用工具、获取提示等)
- **SSE 事件流**:服务器主动推送数据给客户端
这是一个典型的 **双向通信模式**:
- GET 请求建立 SSE 连接(服务器到客户端的单向流)
- POST 请求发送命令(客户端到服务器)
- 结合起来实现双向实时通信
所以这个 GET 请求虽然看起来简单,但它是整个实时通信架构的基础连接层。