ABP VNext + CRDT 打造实时协同编辑
🛠️ ABP VNext + CRDT 打造实时协同编辑器 🎉
📚 目录
- 🛠️ ABP VNext + CRDT 打造实时协同编辑器 🎉
- 🧠 背景与挑战
- 🔹 系统架构
- 🛣️ 端到端流程 🚦
- 🔒 安全与鉴权 🔐
- ✨ 后端核心代码 🖥️
- CollaborationHub
- DocumentStateManager
- 🔧 前端组件 (Yjs + y-signalr) 🖌️
- ⚙️ 环境搭建与快速启动 🏃♂️
- 🚀 性能测试与运维 📈
🧠 背景与挑战
- 💥 多用户并发:无锁场景下自动合并冲突
- ⚡ 实时同步:毫秒级广播到所有客户端
- 🔄 缓存与持久化:重启后秒级恢复,支持版本回溯
基于 Yjs (CRDT) + ABP VNext + SignalR,打造企业级 Markdown 协作系统。🎉
🔹 系统架构
- Redis 缓存:
key=doc:{docId}:state
,TTL=1 小时 - PostgreSQL:二进制 CRDT 状态历史,定时快照(建议 10 秒一次)
- CRDT GC:定期合并状态、剪枝历史更新,避免状态膨胀
🛣️ 端到端流程 🚦
🔒 安全与鉴权 🔐
-
JWT 鉴权
services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme).AddJwtBearer(options => {options.Authority = "https://your-auth-server";options.Audience = "collab-api";});[Authorize] public class CollaborationHub : Hub { … }
-
Rate Limiting 🛑
builder.Services.AddRateLimiter(opts =>opts.AddFixedWindowLimiter("hubLimiter", o => {o.PermitLimit = 100;o.Window = TimeSpan.FromSeconds(1);})); app.UseRateLimiter();
-
熔断与重试 ♻️
Policy.Handle<Exception>().CircuitBreaker(5, TimeSpan.FromSeconds(30)).WrapAsync(Policy.Handle<Exception>().RetryAsync(3));
✨ 后端核心代码 🖥️
CollaborationHub
[Authorize]
public class CollaborationHub : Hub
{private readonly ILogger<CollaborationHub> _logger;private readonly Counter _msgCounter = Metrics.CreateCounter("hub_messages_total", "Total messages processed by Hub");public CollaborationHub(ILogger<CollaborationHub> logger) => _logger = logger;public override async Task OnConnectedAsync(){var docId = Context.GetHttpContext()?.Request.Query["docId"].ToString();if (string.IsNullOrWhiteSpace(docId))throw new HubException("docId required");await Groups.AddToGroupAsync(Context.ConnectionId, docId);await base.OnConnectedAsync();}public override async Task OnDisconnectedAsync(Exception? ex){var docId = Context.GetHttpContext()?.Request.Query["docId"].ToString();if (!string.IsNullOrWhiteSpace(docId))await Groups.RemoveFromGroupAsync(Context.ConnectionId, docId);await base.OnDisconnectedAsync(ex);}public async Task SyncOperation(string docId, byte[] delta){if (delta.Length > 1_000_000)throw new HubException("Delta too large");_msgCounter.Inc();try{await Clients.OthersInGroup(docId).SendAsync("ReceiveDelta", delta);}catch (Exception ex){_logger.LogError(ex, "Sync failed for {DocId}", docId);throw;}}
}
DocumentStateManager
public class DocumentStateManager : ITransientDependency
{private readonly IDistributedCache _cache;private readonly IRepository<DocumentSnapshot, Guid> _repo;private readonly Histogram _persistHist = Metrics.CreateHistogram("persist_duration_seconds", "DB persist duration");public DocumentStateManager(IDistributedCache cache,IRepository<DocumentSnapshot, Guid> repo){_cache = cache;_repo = repo;}public async Task SaveAsync(string docId, byte[] crdtState){await _cache.SetAsync($"doc:{docId}:state", crdtState,new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1) });using var _ = _persistHist.NewTimer();await _repo.InsertAsync(new DocumentSnapshot {Id = Guid.NewGuid(),DocId = docId,CrdtState = crdtState,Timestamp = Clock.Now}, autoSave: true);}public async Task<byte[]?> LoadAsync(string docId){var cache = await _cache.GetAsync($"doc:{docId}:state");if (cache is not null) return cache;var latest = await _repo.Where(x => x.DocId == docId).OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();return latest?.CrdtState;}
}
🔧 前端组件 (Yjs + y-signalr) 🖌️
import * as Y from 'yjs';
import { MonacoBinding } from 'y-monaco';
import { SignalrProvider } from 'y-signalr';const ydoc = new Y.Doc();
const provider = new SignalrProvider("wss://your-domain/hub?docId=mydoc","markdown-room", ydoc);
const yText = ydoc.getText("monaco");const editor = monaco.editor.create(document.getElementById("editor"), { language: "markdown" });new MonacoBinding(yText, editor.getModel(),new Set([editor]), provider.awareness);
⚙️ 环境搭建与快速启动 🏃♂️
# 指定版本
dotnet add package Volo.Abp.SignalR --version 7.4.0
dotnet add package Yjs.SignalR --version 1.2.3
npm install yjs@13.5.25 y-signalr@1.0.0 y-monaco@0.3.0 monaco-editor@0.45.0
title: docker-compose.yml
version: "3.8"
services:redis:image: redis:7.0ports: ["6379:6379"]postgres:image: postgres:15.2environment:POSTGRES_USER: postgresPOSTGRES_PASSWORD: mypwdports: ["5432:5432"]app:build: .environment:ConnectionStrings__Default: "Host=postgres;User=postgres;Password=mypwd;Database=collab"depends_on: ["redis","postgres"]ports: ["5000:80"]
启动后:
docker-compose up -d
dotnet run --project src/Collab.Api
npm run dev --prefix src/Collab.Web
🚀 性能测试与运维 📈
- Artillery 压测:并发 500 用户测试
SyncOperation
- Prometheus & Grafana:
hub_messages_total
🕹️persist_duration_seconds
⏱️redis_cache_hit_ratio
🔍
- CRDT GC:使用
ydoc.gc()
定期回收冗余状态 - 监控告警:缓存命中率 <80% 或持久化延迟 >500ms 时触发