Vue3从入门到精通: 4.5 数据持久化与同步策略深度解析
👋 大家好,我是 阿问学长
!专注于分享优质开源项目
解析、毕业设计项目指导
支持、幼小初高
的教辅资料
推荐等,欢迎关注交流!🚀
数据持久化与同步策略深度解析
🎯 学习目标
通过本文,你将深入掌握:
- 多层次数据持久化策略的设计
- 离线优先的数据同步机制
- 实时数据同步和冲突解决方案
- 数据版本控制和增量同步技术
- 大规模应用的数据管理架构
💾 多层次数据持久化架构
持久化存储的分层设计
现代Web应用需要在多个层次上进行数据持久化,以提供最佳的用户体验和数据可靠性:
// 持久化存储分层架构
/*
┌─────────────────────────────────────┐
│ Memory Layer (内存层) │ ← 最快访问,易失性
├─────────────────────────────────────┤
│ Session Storage (会话层) │ ← 会话级持久化
├─────────────────────────────────────┤
│ Local Storage (本地层) │ ← 持久化存储
├─────────────────────────────────────┤
│ IndexedDB (数据库层) │ ← 结构化大容量存储
├─────────────────────────────────────┤
│ Cache API (缓存层) │ ← Service Worker缓存
├─────────────────────────────────────┤
│ Remote Server (服务端) │ ← 权威数据源
└─────────────────────────────────────┘
*/class LayeredPersistenceManager {constructor() {this.layers = new Map()this.setupLayers()}setupLayers() {// 内存层 - 最快访问this.layers.set('memory', {storage: new Map(),priority: 1,capacity: 1000, // 最大条目数ttl: 5 * 60 * 1000, // 5分钟TTLasync get(key) {const item = this.storage.get(key)if (!item) return null// 检查TTLif (Date.now() - item.timestamp > this.ttl) {this.storage.delete(key)return null}return item.data},async set(key, data) {// 容量控制if (this.storage.size >= this.capacity) {this.evictOldest()}this.storage.set(key, {data,timestamp: Date.now()})},evictOldest() {const entries = Array.from(this.storage.entries())entries.sort((a, b) => a[1].timestamp - b[1].timestamp)// 删除最旧的25%const toDelete = Math.floor(entries.length * 0.25)for (let i = 0; i < toDelete; i++) {this.storage.delete(entries[i][0])}}})// 会话存储层this.layers.set('session', {priority: 2,capacity: 5 * 1024 * 1024, // 5MBasync get(key) {try {const item = sessionStorage.getItem(key)return item ? JSON.parse(item) : null} catch (error) {console.error('Session storage get error:', error)return null}},async set(key, data) {try {const serialized = JSON.stringify(data)// 检查容量if (this.getStorageSize() + serialized.length > this.capacity) {this.cleanup()}sessionStorage.setItem(key, serialized)} catch (error) {console.error('Session storage set error:', error)}},getStorageSize() {let total = 0for (let key in sessionStorage) {if (sessionStorage.hasOwnProperty(key)) {total += sessionStorage[key].length}}return total},cleanup() {// 清理过期数据const now = Date.now()for (let key in sessionStorage) {try {const item = JSON.parse(sessionStorage[key])if (item._timestamp && now - item._timestamp > 30 * 60 * 1000) {sessionStorage.removeItem(key)}} catch (error) {// 忽略解析错误}}}})// 本地存储层this.layers.set('local', {priority: 3,capacity: 10 * 1024 * 1024, // 10MBasync get(key) {try {const item = localStorage.getItem(key)if (!item) return nullconst parsed = JSON.parse(item)// 检查过期时间if (parsed._expires && Date.now() > parsed._expires) {localStorage.removeItem(key)return null}return parsed.data} catch (error) {console.error('Local storage get error:', error)return null}},async set(key, data, options = {}) {try {const item = {data,_timestamp: Date.now(),_expires: options.expires || null}localStorage.setItem(key, JSON.stringify(item))} catch (error) {if (error.name === 'QuotaExceededError') {this.cleanup()// 重试一次try {localStorage.setItem(key, JSON.stringify(item))} catch (retryError) {console.error('Local storage quota exceeded:', retryError)}} else {console.error('Local storage set error:', error)}}},cleanup() {const now = Date.now()const items = []// 收集所有项目及其时间戳for (let key in localStorage) {try {const item = JSON.parse(localStorage[key])items.push({key,timestamp: item._timestamp || 0,size: localStorage[key].length})} catch (error) {// 删除无效项目localStorage.removeItem(key)}}// 按时间戳排序,删除最旧的项目items.sort((a, b) => a.timestamp - b.timestamp)let freedSpace = 0const targetSpace = this.capacity * 0.2 // 释放20%空间for (const item of items) {localStorage.removeItem(item.key)freedSpace += item.sizeif (freedSpace >= targetSpace) {break}}}})// IndexedDB层this.layers.set('indexeddb', {priority: 4,db: null,async init() {return new Promise((resolve, reject) => {const request = indexedDB.open('AppDatabase', 1)request.onerror = () => reject(request.error)request.onsuccess = () => {this.db = request.resultresolve()}request.onupgradeneeded = (event) => {const db = event.target.result// 创建对象存储if (!db.objectStoreNames.contains('cache')) {const store = db.createObjectStore('cache', { keyPath: 'key' })store.createIndex('timestamp', 'timestamp', { unique: false })store.createIndex('expires', 'expires', { unique: false })}}})},async get(key) {if (!this.db) await this.init()return new Promise((resolve, reject) => {const transaction = this.db.transaction(['cache'], 'readonly')const store = transaction.objectStore('cache')const request = store.get(key)request.onerror = () => reject(request.error)request.onsuccess = () => {const result = request.resultif (!result) {resolve(null)return}// 检查过期时间if (result.expires && Date.now() > result.expires) {this.delete(key)resolve(null)return}resolve(result.data)}})},async set(key, data, options = {}) {if (!this.db) await this.init()return new Promise((resolve, reject) => {const transaction = this.db.transaction(['cache'], 'readwrite')const store = transaction.objectStore('cache')const item = {key,data,timestamp: Date.now(),expires: options.expires || null,size: JSON.stringify(data).length}const request = store.put(item)request.onerror = () => reject(request.error)request.onsuccess = () => resolve()})},async delete(key) {if (!this.db) await this.init()return new Promise((resolve, reject) => {const transaction = this.db.transaction(['cache'], 'readwrite')const store = transaction.objectStore('cache')const request = store.delete(key)request.onerror = () => reject(request.error)request.onsuccess = () => resolve()})},async cleanup() {if (!this.db) await this.init()const now = Date.now()const transaction = this.db.transaction(['cache'], 'readwrite')const store = transaction.objectStore('cache')const index = store.index('expires')// 删除过期项目const range = IDBKeyRange.upperBound(now)const request = index.openCursor(range)request.onsuccess = (event) => {const cursor = event.target.resultif (cursor) {cursor.delete()cursor.continue()}}}})}// 获取数据(按优先级查找)async get(key, options = {}) {const layers = Array.from(this.layers.entries()).sort((a, b) => a[1].priority - b[1].priority)for (const [name, layer] of layers) {try {const data = await layer.get(key)if (data !== null) {// 向上层传播数据this.propagateUp(key, data, name)return data}} catch (error) {console.error(`Error getting from ${name} layer:`, error)}}return null}// 设置数据(写入所有层)async set(key, data, options = {}) {const promises = []for (const [name, layer] of this.layers) {promises.push(layer.set(key, data, options).catch(error => {console.error(`Error setting to ${name} layer:`, error)}))}await Promise.allSettled(promises)}// 向上层传播数据async propagateUp(key, data, fromLayer) {const layers = Array.from(this.layers.entries()).filter(([name]) => name !== fromLayer).sort((a, b) => a[1].priority - b[1].priority)for (const [name, layer] of layers) {if (layer.priority < this.layers.get(fromLayer).priority) {try {await layer.set(key, data)} catch (error) {console.error(`Error propagating to ${name} layer:`, error)}}}}// 清理所有层async cleanup() {const promises = []for (const [name, layer] of this.layers) {if (layer.cleanup) {promises.push(layer.cleanup().catch(error => {console.error(`Error cleaning up ${name} layer:`, error)}))}}await Promise.allSettled(promises)}
}// 在Pinia Store中使用分层持久化
export const usePersistentStore = defineStore('persistent', () => {const persistenceManager = new LayeredPersistenceManager()// 状态const data = ref(new Map())const loading = ref(false)// 获取数据const getData = async (key) => {loading.value = truetry {// 首先从内存中查找if (data.value.has(key)) {return data.value.get(key)}// 从持久化层查找const persistedData = await persistenceManager.get(key)if (persistedData) {data.value.set(key, persistedData)return persistedData}return null} finally {loading.value = false}}// 设置数据const setData = async (key, value, options = {}) => {// 更新内存data.value.set(key, value)// 持久化到所有层await persistenceManager.set(key, value, options)}// 删除数据const deleteData = async (key) => {data.value.delete(key)// 从所有层删除for (const [name, layer] of persistenceManager.layers) {if (layer.delete) {try {await layer.delete(key)} catch (error) {console.error(`Error deleting from ${name} layer:`, error)}}}}return {data: readonly(data),loading: readonly(loading),getData,setData,deleteData}
})
🔄 离线优先的数据同步
离线数据管理策略
// 离线优先数据管理器
class OfflineFirstDataManager {constructor() {this.syncQueue = []this.conflictResolver = new ConflictResolver()this.networkStatus = navigator.onLinethis.setupNetworkListeners()this.setupPeriodicSync()}setupNetworkListeners() {window.addEventListener('online', () => {this.networkStatus = truethis.processSyncQueue()})window.addEventListener('offline', () => {this.networkStatus = false})}setupPeriodicSync() {// 注册后台同步if ('serviceWorker' in navigator && 'sync' in window.ServiceWorkerRegistration.prototype) {navigator.serviceWorker.ready.then(registration => {registration.sync.register('background-sync')})}// 定期同步(在线时)setInterval(() => {if (this.networkStatus) {this.processSyncQueue()}}, 30000) // 30秒}// 创建或更新数据async createOrUpdate(collection, id, data, options = {}) {const operation = {id: generateId(),type: id ? 'update' : 'create',collection,entityId: id || generateId(),data,timestamp: Date.now(),clientId: this.getClientId(),retry: 0,maxRetries: options.maxRetries || 3}// 立即更新本地存储await this.updateLocalStorage(collection, operation.entityId, {...data,_localId: operation.entityId,_lastModified: operation.timestamp,_syncStatus: 'pending'})// 添加到同步队列this.syncQueue.push(operation)// 如果在线,立即尝试同步if (this.networkStatus) {this.processSyncQueue()}return operation.entityId}// 删除数据async delete(collection, id) {const operation = {id: generateId(),type: 'delete',collection,entityId: id,timestamp: Date.now(),clientId: this.getClientId(),retry: 0,maxRetries: 3}// 标记为已删除(软删除)await this.updateLocalStorage(collection, id, {_deleted: true,_lastModified: operation.timestamp,_syncStatus: 'pending'})this.syncQueue.push(operation)if (this.networkStatus) {this.processSyncQueue()}}// 查询数据async query(collection, filter = {}) {const localData = await this.getLocalData(collection)// 过滤本地数据let results = localData.filter(item => {// 排除已删除的项目if (item._deleted) return false// 应用过滤条件return Object.entries(filter).every(([key, value]) => {if (typeof value === 'object' && value.operator) {return this.applyOperator(item[key], value.operator, value.value)}return item[key] === value})})// 如果在线,尝试从服务器获取最新数据if (this.networkStatus) {try {const serverData = await this.fetchFromServer(collection, filter)results = await this.mergeWithServerData(results, serverData)} catch (error) {console.warn('Failed to fetch from server, using local data:', error)}}return results}// 处理同步队列async processSyncQueue() {if (!this.networkStatus || this.syncQueue.length === 0) {return}const operations = [...this.syncQueue]this.syncQueue = []for (const operation of operations) {try {await this.syncOperation(operation)} catch (error) {console.error('Sync operation failed:', error)// 重试逻辑if (operation.retry < operation.maxRetries) {operation.retry++this.syncQueue.push(operation)} else {// 标记为同步失败await this.markSyncFailed(operation)}}}}// 同步单个操作async syncOperation(operation) {const { type, collection, entityId, data } = operationswitch (type) {case 'create':const created = await this.createOnServer(collection, data)await this.updateLocalAfterSync(collection, entityId, created, 'synced')breakcase 'update':const updated = await this.updateOnServer(collection, entityId, data)await this.updateLocalAfterSync(collection, entityId, updated, 'synced')breakcase 'delete':await this.deleteOnServer(collection, entityId)await this.removeFromLocal(collection, entityId)break}}// 与服务器数据合并async mergeWithServerData(localData, serverData) {const merged = new Map()// 添加本地数据localData.forEach(item => {merged.set(item._localId || item.id, item)})// 合并服务器数据for (const serverItem of serverData) {const localItem = merged.get(serverItem.id)if (!localItem) {// 服务器有新数据merged.set(serverItem.id, {...serverItem,_syncStatus: 'synced'})} else if (localItem._syncStatus === 'synced') {// 检查服务器数据是否更新if (serverItem.updatedAt > localItem.updatedAt) {merged.set(serverItem.id, {...serverItem,_syncStatus: 'synced'})}} else {// 本地有未同步的更改,需要冲突解决const resolved = await this.conflictResolver.resolve(localItem,serverItem,'merge' // 默认合并策略)merged.set(serverItem.id, {...resolved,_syncStatus: 'conflict_resolved'})}}return Array.from(merged.values())}// 本地存储操作async updateLocalStorage(collection, id, data) {const db = await this.getIndexedDB()const transaction = db.transaction([collection], 'readwrite')const store = transaction.objectStore(collection)await new Promise((resolve, reject) => {const request = store.put({ ...data, id })request.onsuccess = () => resolve()request.onerror = () => reject(request.error)})}async getLocalData(collection) {const db = await this.getIndexedDB()const transaction = db.transaction([collection], 'readonly')const store = transaction.objectStore(collection)return new Promise((resolve, reject) => {const request = store.getAll()request.onsuccess = () => resolve(request.result)request.onerror = () => reject(request.error)})}// 服务器操作async createOnServer(collection, data) {const response = await fetch(`/api/${collection}`, {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify(data)})if (!response.ok) {throw new Error(`Server create failed: ${response.statusText}`)}return response.json()}async updateOnServer(collection, id, data) {const response = await fetch(`/api/${collection}/${id}`, {method: 'PUT',headers: { 'Content-Type': 'application/json' },body: JSON.stringify(data)})if (!response.ok) {throw new Error(`Server update failed: ${response.statusText}`)}return response.json()}async deleteOnServer(collection, id) {const response = await fetch(`/api/${collection}/${id}`, {method: 'DELETE'})if (!response.ok) {throw new Error(`Server delete failed: ${response.statusText}`)}}async fetchFromServer(collection, filter) {const params = new URLSearchParams(filter)const response = await fetch(`/api/${collection}?${params}`)if (!response.ok) {throw new Error(`Server fetch failed: ${response.statusText}`)}return response.json()}getClientId() {let clientId = localStorage.getItem('clientId')if (!clientId) {clientId = generateId()localStorage.setItem('clientId', clientId)}return clientId}async getIndexedDB() {// IndexedDB初始化逻辑// 返回数据库连接}applyOperator(value, operator, operand) {switch (operator) {case 'eq': return value === operandcase 'ne': return value !== operandcase 'gt': return value > operandcase 'gte': return value >= operandcase 'lt': return value < operandcase 'lte': return value <= operandcase 'in': return operand.includes(value)case 'contains': return value.includes(operand)default: return true}}
}// 冲突解决器
class ConflictResolver {constructor() {this.strategies = new Map()this.setupDefaultStrategies()}setupDefaultStrategies() {// 服务器优先策略this.strategies.set('server-wins', (local, server) => server)// 客户端优先策略this.strategies.set('client-wins', (local, server) => local)// 最后修改时间优先this.strategies.set('last-modified-wins', (local, server) => {const localTime = new Date(local.updatedAt || local._lastModified)const serverTime = new Date(server.updatedAt)return localTime > serverTime ? local : server})// 字段级合并策略this.strategies.set('merge', (local, server) => {const merged = { ...server }// 保留本地的未同步更改Object.keys(local).forEach(key => {if (key.startsWith('_') || local[key] !== server[key]) {// 对于特定字段,使用时间戳比较if (this.hasTimestamp(local, key) && this.hasTimestamp(server, key)) {const localTime = this.getFieldTimestamp(local, key)const serverTime = this.getFieldTimestamp(server, key)if (localTime > serverTime) {merged[key] = local[key]}} else {merged[key] = local[key]}}})return merged})}async resolve(local, server, strategy = 'merge') {const resolver = this.strategies.get(strategy)if (!resolver) {throw new Error(`Unknown conflict resolution strategy: ${strategy}`)}return resolver(local, server)}hasTimestamp(obj, field) {return obj[`${field}_timestamp`] !== undefined}getFieldTimestamp(obj, field) {return obj[`${field}_timestamp`] || obj._lastModified || 0}
}// 在Pinia Store中使用离线优先管理
export const useOfflineStore = defineStore('offline', () => {const dataManager = new OfflineFirstDataManager()// 状态const items = ref([])const syncStatus = ref('idle') // 'idle' | 'syncing' | 'error'const pendingOperations = ref(0)// 创建项目const createItem = async (data) => {syncStatus.value = 'syncing'pendingOperations.value++try {const id = await dataManager.createOrUpdate('items', null, data)// 更新本地状态items.value.push({...data,id,_syncStatus: 'pending'})return id} finally {pendingOperations.value--if (pendingOperations.value === 0) {syncStatus.value = 'idle'}}}// 更新项目const updateItem = async (id, data) => {syncStatus.value = 'syncing'pendingOperations.value++try {await dataManager.createOrUpdate('items', id, data)// 更新本地状态const index = items.value.findIndex(item => item.id === id)if (index > -1) {items.value[index] = {...items.value[index],...data,_syncStatus: 'pending'}}} finally {pendingOperations.value--if (pendingOperations.value === 0) {syncStatus.value = 'idle'}}}// 删除项目const deleteItem = async (id) => {syncStatus.value = 'syncing'pendingOperations.value++try {await dataManager.delete('items', id)// 从本地状态移除items.value = items.value.filter(item => item.id !== id)} finally {pendingOperations.value--if (pendingOperations.value === 0) {syncStatus.value = 'idle'}}}// 查询项目const queryItems = async (filter = {}) => {try {const results = await dataManager.query('items', filter)items.value = resultsreturn results} catch (error) {syncStatus.value = 'error'throw error}}// 强制同步const forceSync = async () => {syncStatus.value = 'syncing'try {await dataManager.processSyncQueue()await queryItems() // 重新查询以获取最新数据} finally {syncStatus.value = 'idle'}}return {items: readonly(items),syncStatus: readonly(syncStatus),pendingOperations: readonly(pendingOperations),createItem,updateItem,deleteItem,queryItems,forceSync}
})
🔄 实时数据同步
WebSocket实时同步
// 实时数据同步管理器
class RealTimeSyncManager {constructor() {this.ws = nullthis.reconnectAttempts = 0this.maxReconnectAttempts = 5this.reconnectDelay = 1000this.subscriptions = new Map()this.messageQueue = []this.isConnected = falsethis.connect()}connect() {try {this.ws = new WebSocket('ws://localhost:8080/realtime')this.ws.onopen = () => {console.log('WebSocket connected')this.isConnected = truethis.reconnectAttempts = 0// 发送队列中的消息this.flushMessageQueue()// 重新订阅this.resubscribe()}this.ws.onmessage = (event) => {this.handleMessage(JSON.parse(event.data))}this.ws.onclose = () => {console.log('WebSocket disconnected')this.isConnected = falsethis.scheduleReconnect()}this.ws.onerror = (error) => {console.error('WebSocket error:', error)}} catch (error) {console.error('Failed to create WebSocket connection:', error)this.scheduleReconnect()}}scheduleReconnect() {if (this.reconnectAttempts < this.maxReconnectAttempts) {this.reconnectAttempts++const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1)console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`)setTimeout(() => {this.connect()}, delay)} else {console.error('Max reconnection attempts reached')}}// 订阅数据变更subscribe(collection, filter = {}, callback) {const subscriptionId = generateId()this.subscriptions.set(subscriptionId, {collection,filter,callback})// 发送订阅消息this.send({type: 'subscribe',subscriptionId,collection,filter})return subscriptionId}// 取消订阅unsubscribe(subscriptionId) {if (this.subscriptions.has(subscriptionId)) {this.subscriptions.delete(subscriptionId)this.send({type: 'unsubscribe',subscriptionId})}}// 发送消息send(message) {if (this.isConnected && this.ws.readyState === WebSocket.OPEN) {this.ws.send(JSON.stringify(message))} else {// 添加到队列this.messageQueue.push(message)}}// 处理接收到的消息handleMessage(message) {switch (message.type) {case 'data_change':this.handleDataChange(message)breakcase 'subscription_confirmed':console.log(`Subscription confirmed: ${message.subscriptionId}`)breakcase 'error':console.error('Server error:', message.error)breakdefault:console.warn('Unknown message type:', message.type)}}// 处理数据变更handleDataChange(message) {const { subscriptionId, changeType, data, metadata } = messageconst subscription = this.subscriptions.get(subscriptionId)if (subscription) {try {subscription.callback({type: changeType, // 'create' | 'update' | 'delete'data,metadata,collection: subscription.collection})} catch (error) {console.error('Error in subscription callback:', error)}}}// 刷新消息队列flushMessageQueue() {while (this.messageQueue.length > 0) {const message = this.messageQueue.shift()this.send(message)}}// 重新订阅resubscribe() {for (const [subscriptionId, subscription] of this.subscriptions) {this.send({type: 'subscribe',subscriptionId,collection: subscription.collection,filter: subscription.filter})}}// 关闭连接close() {if (this.ws) {this.ws.close()}}
}// 在Pinia Store中使用实时同步
export const useRealTimeStore = defineStore('realtime', () => {const syncManager = new RealTimeSyncManager()// 状态const items = ref([])const connectionStatus = ref('connecting')const subscriptions = ref(new Map())// 监听连接状态watch(() => syncManager.isConnected,(isConnected) => {connectionStatus.value = isConnected ? 'connected' : 'disconnected'})// 订阅集合变更const subscribeToCollection = (collection, filter = {}) => {const subscriptionId = syncManager.subscribe(collection, filter, (change) => {handleDataChange(change)})subscriptions.value.set(subscriptionId, {collection,filter,createdAt: Date.now()})return subscriptionId}// 处理数据变更const handleDataChange = (change) => {const { type, data, collection } = changeswitch (type) {case 'create':// 检查是否已存在if (!items.value.find(item => item.id === data.id)) {items.value.push(data)}breakcase 'update':const updateIndex = items.value.findIndex(item => item.id === data.id)if (updateIndex > -1) {items.value[updateIndex] = { ...items.value[updateIndex], ...data }}breakcase 'delete':items.value = items.value.filter(item => item.id !== data.id)break}// 发布本地事件eventBus.publish('DataChanged', {collection,type,data})}// 取消订阅const unsubscribeFromCollection = (subscriptionId) => {syncManager.unsubscribe(subscriptionId)subscriptions.value.delete(subscriptionId)}// 清理所有订阅const cleanup = () => {for (const subscriptionId of subscriptions.value.keys()) {syncManager.unsubscribe(subscriptionId)}subscriptions.value.clear()syncManager.close()}// 组件卸载时清理onBeforeUnmount(() => {cleanup()})return {items: readonly(items),connectionStatus: readonly(connectionStatus),subscriptions: readonly(subscriptions),subscribeToCollection,unsubscribeFromCollection,cleanup}
})
📊 数据版本控制和增量同步
版本控制系统
// 数据版本控制管理器
class DataVersionManager {constructor() {this.versionStore = new Map()this.changeLog = []this.maxChangeLogSize = 1000}// 创建数据版本createVersion(entityId, data, metadata = {}) {const version = {id: generateId(),entityId,data: this.deepClone(data),version: this.getNextVersion(entityId),timestamp: Date.now(),author: metadata.author || 'system',comment: metadata.comment || '',checksum: this.calculateChecksum(data)}// 存储版本if (!this.versionStore.has(entityId)) {this.versionStore.set(entityId, [])}this.versionStore.get(entityId).push(version)// 记录变更日志this.addToChangeLog({type: 'version_created',entityId,versionId: version.id,timestamp: version.timestamp})return version}// 获取实体的所有版本getVersions(entityId) {return this.versionStore.get(entityId) || []}// 获取特定版本getVersion(entityId, versionNumber) {const versions = this.getVersions(entityId)return versions.find(v => v.version === versionNumber)}// 获取最新版本getLatestVersion(entityId) {const versions = this.getVersions(entityId)return versions.length > 0 ? versions[versions.length - 1] : null}// 比较版本差异compareVersions(entityId, fromVersion, toVersion) {const from = this.getVersion(entityId, fromVersion)const to = this.getVersion(entityId, toVersion)if (!from || !to) {throw new Error('Version not found')}return this.calculateDiff(from.data, to.data)}// 应用增量更新applyDelta(entityId, baseVersion, delta) {const base = this.getVersion(entityId, baseVersion)if (!base) {throw new Error('Base version not found')}const updatedData = this.applyDeltaToData(base.data, delta)return this.createVersion(entityId, updatedData, {comment: `Applied delta from version ${baseVersion}`})}// 回滚到指定版本rollback(entityId, targetVersion) {const target = this.getVersion(entityId, targetVersion)if (!target) {throw new Error('Target version not found')}return this.createVersion(entityId, target.data, {comment: `Rolled back to version ${targetVersion}`})}// 计算数据差异calculateDiff(oldData, newData) {const diff = {added: {},modified: {},deleted: {}}// 检查新增和修改for (const key in newData) {if (!(key in oldData)) {diff.added[key] = newData[key]} else if (JSON.stringify(oldData[key]) !== JSON.stringify(newData[key])) {diff.modified[key] = {old: oldData[key],new: newData[key]}}}// 检查删除for (const key in oldData) {if (!(key in newData)) {diff.deleted[key] = oldData[key]}}return diff}// 应用差异到数据applyDeltaToData(baseData, delta) {const result = this.deepClone(baseData)// 应用新增Object.assign(result, delta.added)// 应用修改for (const key in delta.modified) {result[key] = delta.modified[key].new}// 应用删除for (const key in delta.deleted) {delete result[key]}return result}// 获取下一个版本号getNextVersion(entityId) {const versions = this.getVersions(entityId)return versions.length > 0 ? versions[versions.length - 1].version + 1 : 1}// 计算校验和calculateChecksum(data) {const str = JSON.stringify(data)let hash = 0for (let i = 0; i < str.length; i++) {const char = str.charCodeAt(i)hash = ((hash << 5) - hash) + charhash = hash & hash // 转换为32位整数}return hash.toString(16)}// 深度克隆deepClone(obj) {return JSON.parse(JSON.stringify(obj))}// 添加到变更日志addToChangeLog(entry) {this.changeLog.push(entry)// 限制日志大小if (this.changeLog.length > this.maxChangeLogSize) {this.changeLog.shift()}}// 获取变更日志getChangeLog(filter = {}) {let log = this.changeLogif (filter.entityId) {log = log.filter(entry => entry.entityId === filter.entityId)}if (filter.since) {log = log.filter(entry => entry.timestamp >= filter.since)}return log}
}// 增量同步管理器
class IncrementalSyncManager {constructor(versionManager) {this.versionManager = versionManagerthis.syncState = new Map() // 存储每个实体的同步状态}// 获取增量更新async getIncrementalUpdates(entityId, lastSyncVersion) {const versions = this.versionManager.getVersions(entityId)const updatesNeeded = versions.filter(v => v.version > lastSyncVersion)if (updatesNeeded.length === 0) {return { hasUpdates: false, updates: [] }}// 计算增量更新const updates = []let currentVersion = lastSyncVersionfor (const version of updatesNeeded) {if (currentVersion === 0) {// 首次同步,发送完整数据updates.push({type: 'full',version: version.version,data: version.data,timestamp: version.timestamp})} else {// 计算增量const previousVersion = this.versionManager.getVersion(entityId, currentVersion)if (previousVersion) {const delta = this.versionManager.calculateDiff(previousVersion.data, version.data)updates.push({type: 'delta',version: version.version,baseVersion: currentVersion,delta,timestamp: version.timestamp})}}currentVersion = version.version}return { hasUpdates: true, updates }}// 应用增量更新async applyIncrementalUpdates(entityId, updates) {let currentData = nulllet currentVersion = 0// 获取当前状态const syncState = this.syncState.get(entityId)if (syncState) {currentData = syncState.datacurrentVersion = syncState.version}for (const update of updates) {if (update.type === 'full') {currentData = update.datacurrentVersion = update.version} else if (update.type === 'delta') {if (currentVersion !== update.baseVersion) {throw new Error(`Version mismatch: expected ${update.baseVersion}, got ${currentVersion}`)}currentData = this.versionManager.applyDeltaToData(currentData, update.delta)currentVersion = update.version}}// 更新同步状态this.syncState.set(entityId, {data: currentData,version: currentVersion,lastSync: Date.now()})return {data: currentData,version: currentVersion}}// 检查是否需要同步needsSync(entityId, serverVersion) {const syncState = this.syncState.get(entityId)return !syncState || syncState.version < serverVersion}// 获取同步状态getSyncState(entityId) {return this.syncState.get(entityId)}// 重置同步状态resetSyncState(entityId) {this.syncState.delete(entityId)}
}// 在Pinia Store中使用版本控制和增量同步
export const useVersionedStore = defineStore('versioned', () => {const versionManager = new DataVersionManager()const syncManager = new IncrementalSyncManager(versionManager)// 状态const entities = ref(new Map())const syncStatus = ref(new Map())// 创建或更新实体const saveEntity = async (entityId, data, metadata = {}) => {// 创建版本const version = versionManager.createVersion(entityId, data, metadata)// 更新本地状态entities.value.set(entityId, {...data,_version: version.version,_lastModified: version.timestamp})// 标记需要同步syncStatus.value.set(entityId, 'pending')return version}// 获取实体const getEntity = (entityId) => {return entities.value.get(entityId)}// 获取实体版本历史const getEntityVersions = (entityId) => {return versionManager.getVersions(entityId)}// 同步实体const syncEntity = async (entityId) => {try {syncStatus.value.set(entityId, 'syncing')// 获取服务器版本信息const serverInfo = await fetchServerVersionInfo(entityId)if (syncManager.needsSync(entityId, serverInfo.version)) {// 获取增量更新const currentState = syncManager.getSyncState(entityId)const lastSyncVersion = currentState ? currentState.version : 0const serverUpdates = await fetchIncrementalUpdates(entityId, lastSyncVersion)if (serverUpdates.hasUpdates) {// 应用增量更新const result = await syncManager.applyIncrementalUpdates(entityId, serverUpdates.updates)// 更新本地状态entities.value.set(entityId, {...result.data,_version: result.version,_lastSync: Date.now()})}}syncStatus.value.set(entityId, 'synced')} catch (error) {syncStatus.value.set(entityId, 'error')throw error}}// 回滚实体到指定版本const rollbackEntity = async (entityId, targetVersion) => {const rolledBack = versionManager.rollback(entityId, targetVersion)entities.value.set(entityId, {...rolledBack.data,_version: rolledBack.version,_lastModified: rolledBack.timestamp})syncStatus.value.set(entityId, 'pending')return rolledBack}return {entities: readonly(entities),syncStatus: readonly(syncStatus),saveEntity,getEntity,getEntityVersions,syncEntity,rollbackEntity}
})// 辅助函数
async function fetchServerVersionInfo(entityId) {const response = await fetch(`/api/entities/${entityId}/version`)return response.json()
}async function fetchIncrementalUpdates(entityId, lastSyncVersion) {const response = await fetch(`/api/entities/${entityId}/updates?since=${lastSyncVersion}`)return response.json()
}
📝 总结
数据持久化与同步是现代Web应用的核心基础设施。通过本文的学习,你应该掌握了:
持久化架构:
- 多层次数据持久化的设计原则
- 不同存储层的特点和适用场景
- 容量管理和数据清理策略
离线优先:
- 离线数据管理的完整方案
- 同步队列和冲突解决机制
- 网络状态感知和自动同步
实时同步:
- WebSocket实时数据同步
- 订阅模式和事件驱动更新
- 连接管理和错误恢复
版本控制:
- 数据版本控制系统的实现
- 增量同步和差异计算
- 回滚和历史管理功能
最佳实践:
- 大规模应用的数据管理架构
- 性能优化和资源管理
- 数据一致性和可靠性保证
掌握这些技术将帮助你构建健壮、高性能的数据管理系统,特别是在处理复杂的企业级应用数据需求时。
至此,"路由与状态管理"系列已经完成。这个系列涵盖了从基础的Vue Router使用到复杂的企业级状态管理架构,为构建大型Vue 3应用提供了完整的解决方案。