当前位置: 首页 > ai >正文

SpreadJS 协同服务器 MongoDB 数据库适配支持

为了支持 SpreadJS 协同编辑场景,协同服务器需要持久化存储文档、操作、快照及里程碑数据。本文介绍了 MongoDB 数据库适配器的实现方法,包括集合初始化、适配器接口实现以及里程碑存储支持。

一、MongoDB 集合初始化

协同编辑服务需要以下集合(Collections)来存储数据:

  • documents:存储文档基本信息(类型、版本号、快照版本号)。
  • operations:存储操作记录,用于实现基于 OT 的操作重放。
  • snapshot_fragments:存储快照的分片数据,支持大文档分段管理。
  • milestone_snapshot:存储里程碑快照,用于回溯和恢复。

初始化脚本示例如下:

export async function InitCollections(client) {const collectionsToCreate = [{ name: 'documents', indexes: [{ key: { id: 1 }, unique: true }] },{ name: 'operations', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },{ name: 'snapshot_fragments', indexes: [{ key: { doc_id: 1, fragment_id: 1 }, unique: true }] },{ name: 'milestone_snapshot', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },];await client.connect();const db = client.db(dbName);
const existingCollections = (await db.listCollections().toArray()).map(c => c.name);
for (const col of collectionsToCreate) {if (!existingCollections.includes(col.name)) {await db.createCollection(col.name);console.log(`Collection '${col.name}' created.`);} else {console.log(`Collection '${col.name}' already exists.`);}for (const idx of col.indexes) {await db.collection(col.name).createIndex(idx.key, { unique: idx.unique });}}
await client.close();
}

二、MongoDB 适配器实现

  1. 适配器说明

适配器 MongoDb 继承了协同服务的数据库接口,负责:

  • 文档信息存取
  • 操作记录管理
  • 快照存储与更新
  • 分片快照管理

可根据业务需要,增加 事务(session)并发冲突检测

  1. 适配器核心实现
export class MongoDb extends Db {constructor(client) {super();this.client = client;this.client.connect()this.db = client.db(dbName);}async getDocument(docId) {const documents = this.db.collection('documents');let row = await documents.findOne({ id: docId });if (row) {return {id: row.id,type: row.type,version: row.version,snapshotVersion: row.snapshot_version};}}async getSnapshot(docId) {const documents = this.db.collection('documents');let row = await documents.findOne({ id: docId });if (!row) {return null;}const fragments = await this.getFragments(docId);return {id: row.id,v: row.snapshot_version,type: row.type,fragments: fragments};}async getFragments(docId) {const fragments = this.db.collection('snapshot_fragments');const rows = await fragments.find({ doc_id: docId }).toArray();if (rows.length === 0) {return {};}const results = {};for (const row of rows) {results[row.fragment_id] = JSON.parse(row.data);}return results;}async getFragment(docId, fragmentId) {const fragments = this.db.collection('snapshot_fragments');const row = await fragments.findOne({ doc_id: docId, fragment_id: fragmentId });if (row) {return JSON.parse(row.data);}return null;}async getOps(docId, from, to) {const operations = this.db.collection('operations');const query = { doc_id: docId, version: { $gte: from } };if (to !== undefined) {query.version.$lte = to;}const rows = await operations.find(query).toArray();if (rows.length === 0) {return [];}return rows.map(row => JSON.parse(row.operation));}async commitOp(docId, op, document) {try {const documents = this.db.collection('documents');const operations = this.db.collection('operations');const row = await documents.findOne({ id: docId });if (op.create) {if (row) {throw new Error(`Document with id ${docId} already exists.`);}await documents.insertOne({id: docId,type: document.type,version: document.version,snapshot_version: document.snapshotVersion},);await operations.insertOne({doc_id: docId,version: op.v,operation: JSON.stringify(op)},);return true;}else if (op.del) {if (!row) {throw new Error(`Document with id ${docId} does not exist.`);}await documents.deleteOne({ id: docId },);return true;}else {if (!row || row.version !== op.v) {throw new Error(`Document with id ${docId} does not exist or version mismatch.`);}await operations.insertOne({doc_id: docId,version: op.v,operation: JSON.stringify(op)},);await documents.updateOne({ id: docId },{ $set: { version: document.version } },);return true;}}catch (error) {console.error('Error committing operation:', error);return false;}finally {}}async commitSnapshot(docId, snapshot) {try {const documents = this.db.collection('documents');const fragments = this.db.collection('snapshot_fragments');const row = await documents.findOne({ id: docId },);if (!row) {throw new Error(`Document with id ${docId} does not exist.`);}const currentSnapshotVersion = row.snapshot_version;if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v <= currentSnapshotVersion) {throw new Error(`Snapshot version mismatch: expected ${currentSnapshotVersion}, got ${snapshot.v}`);}await documents.updateOne({ id: docId },{ $set: { snapshot_version: snapshot.v } },);if (snapshot.fragmentsChanges.deleteSnapshot) {fragments.deleteMany({ doc_id: docId },);}else {const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;if (createFragments) {const createOps = Object.entries(createFragments).map(([id, data]) => ({doc_id: docId,fragment_id: id,data: JSON.stringify(data)}));if (createOps.length > 0) {await fragments.insertMany(createOps,);}}if (updateFragments) {const updateOps = Object.entries(updateFragments).map(([id, data]) => ({updateOne: {filter: { doc_id: docId, fragment_id: id },update: { $set: { data: JSON.stringify(data) } }}}));if (updateOps.length > 0) {await fragments.bulkWrite(updateOps,// { session });}}if (deleteFragments) {const deleteOps = deleteFragments.map(id => ({deleteOne: {filter: { doc_id: docId, fragment_id: id }}}));if (deleteOps.length > 0) {await fragments.bulkWrite(deleteOps,);}}}return true;}catch (error) {console.error('Error committing snapshot:', error);return false;}finally {}}async close() {await this.client.close();}
}

三、里程碑数据存储

里程碑快照用于优化快照恢复性能(避免从头重放所有操作)。 实现类 MongoMilestoneDb 提供 保存读取 接口:

export class MongoMilestoneDb {constructor(client, interval) {this.client = client;this.interval = interval ? interval : 1000;this.db = client.db(dbName);}
async saveMilestoneSnapshot(snapshot) {const milestones = this.db.collection('milestone_snapshot');await milestones.insertOne({doc_id: snapshot.id,version: snapshot.v,snapshot: JSON.stringify(snapshot)});return true;}
async getMilestoneSnapshot(id, version) {const milestones = this.db.collection('milestone_snapshot');const row = await milestones.findOne({ doc_id: id, version: { $lte: version } },{ sort: { version: -1 } });if (row) {return JSON.parse(row.snapshot);}return null;}
}

四、在 DocumentServices 中配置

完成适配器与里程碑数据库后,需要在 DocumentServices 中进行配置:

const documentServices = new OT.DocumentServices(
{ db: new MongoDb(mongoClient),milestoneDb: new MongoMilestoneDb(mongoClient, 500)
});

这样,SpreadJS 协同服务器即可通过 MongoDB 实现文档存储、操作日志管理、快照与里程碑维护,保证协同编辑过程的高效与可扩展。

五、总结

本文展示了 SpreadJS 协同服务器对 MongoDB 数据库的适配实现,主要包括:

  1. 集合初始化:定义所需集合与索引。
  2. 数据库****适配器:支持文档、操作、快照的存储与管理。
  3. 里程碑存储:提供快照的高效回溯能力。
  4. 服务集成:在 DocumentServices 中配置 MongoDB 适配器。

借助 MongoDB 的高性能与灵活数据结构,SpreadJS 协同服务可实现稳定、可扩展的文档协作平台。

扩展链接

使用 MySQL 为 SpreadJS 协同服务器提供存储支持

http://www.xdnf.cn/news/18122.html

相关文章:

  • Flink Checkpoint 原理深度剖析与作用讲解(flink面试高频问题)
  • RK3128增加usb调试模式,开放adb和root权限
  • 分布式搜索(Elasticsearch)深入用法
  • 基于Python的宠物服务管理系统 Python+Django+Vue.js
  • 卫生许可证识别技术:通过OCR与NLP实现高效合规管理,提升审核准确性与效率
  • 传输层协议——UDP和TCP
  • 论文阅读:Prompt Optimization in Large Language Models
  • 传统概率信息检索模型:理论基础、演进与局限
  • 轻度娱乐浪潮下定制开发开源AI智能名片S2B2C商城小程序的机遇与策略
  • RNN(循环神经网络)和Transformer是处理自然语言处理(NLP)任务区别
  • 10.Ansible角色管理
  • 力扣2道dp
  • Rust 入门 生命周期-next2 (十九)
  • flask——4:请求与响应
  • Kubernetes(K8s)常用命令全解析:从基础到进阶
  • Unity进阶--C#补充知识点--【Unity跨平台的原理】Mono与IL2CPP
  • Disbursement on Quarantine Policy(概率、逆元计算期望)
  • 【深度学习】pytorch深度学习框架的环境配置
  • Ansible文件部署与大项目多主机管理
  • 学习嵌入式的第二十天——数据结构
  • redis-集成prometheus监控(k8s)
  • 实习两个月总结
  • 从0到1掌握 Spring Security(第三篇):三种认证方式,按配置一键切换
  • 传统方式部署(RuoYi-Cloud)微服务
  • 像素风球球大作战 HTML 游戏
  • vben admin 下拉支持收索
  • 谷粒商城项目-P3简介-分布式基础概念
  • 牛津大学xDeepMind 自然语言处理(1)
  • Mysql——前模糊索引失效原因及解决方式
  • C++多线程编程深度解析【C++进阶每日一学】