Unity EventCenter 消息中心的设计与实现
在开发过程中,想要传递信号和数据,就得在不同模块之间实现通信。直接通过单例调用虽然简单,但会导致代码高度耦合,难以维护。消息中心提供了一种松耦合的通信方式:发布者不需要知道谁接收事件,接收者不需要知道事件来自哪里,由此减少模块间的直接依赖,便于扩展和维护。
架构
%% EventCenter 架构图
graph TDsubgraph 事件中心核心A[EventCenter] --> B[ConcurrentDictionary eventKey:callback]endsubgraph 调用接口A --> E[添加监听 AddListener]A --> F[移除监听 RemoveListener]A --> G[同步触发 SyncBroadcast]A --> H[异步触发 Broadcast]endsubgraph 线程管理G -->|立即执行| J[当前线程]H -->|主线程队列延迟| K[主线程]endsubgraph 异常处理G --> L[Try-Catch块]H --> LL --> M[打印错误日志]M --> N[继续触发剩余回调]end
数据结构
我们需要一个数据结构来存储eventKey与callback的映射关系,可以使用字典。
如果使用Dictionary,当多个线程同时注册或触发事件,可能导致数据异常,因此我们可以使用ConcurrentDictionary,它是门为高并发场景设计的线程安全集合,内置原子操作,无需手动加锁。
ConcurrentDictionary 提供以下方法:
bool TryAdd(TKey key, TValue value);
bool TryRemove(TKey key, out TValue value);
bool TryGetValue(TKey key, out TValue value);
bool ContainsKey(TKey key);
思考一下,可以使用 ConcurrentDictionary<string,delegate>存储eventKey与callback的映射,但很快便发现,这样一个key只能对应一个回调,不能满足多个模块监听一个事件的应用场景。
于是我们尝试 ConcurrentDictionary<string,List<delegate>>,但是ConcurrentDictionary只能保证获取到 List<T> 的过程是安全的,修改 List<T> 仍然会存在线程不安全的问题。
假设有两个线程同时执行这段代码:
if (!eventDic.ContainsKey("Attack"))
{ eventDic["Attack"] = new List<Delegate>(); eventDic["Attack"].Add(callback);
}
时间 | 线程1 | 线程2 |
---|---|---|
t1 | 执行步骤1(判断"Attack"不存在) | - |
t2 | - | 执行步骤1(同样判断"Attack"不存在) |
t3 | 执行步骤2(创建新List) | - |
t4 | - | 执行步骤2(再次创建新List,覆盖线程1创建的List) |
t5 | 执行步骤3(向被覆盖的List添加handler1) | - |
t6 | - | 执行步骤3(向新List添加handler2) |
可以看到线程2覆盖了线程1创建的List,这可能会导致数据错误。
我们可以尝试使用ConcurrentDictionary<string,ConcurrentDictionary<Delegate,bool>>,嵌套的内层字典可以存储多个Delegate,并且修改操作都是线程安全的,对于内存字典的值,我们是用不上的,可以使用字节数最小的bool类型来占位。
广播
在广播时,我们遍历存储当前key所有委托的内层字典,并依次执行其回调函数。
因为我们不需要回调函数的返回值,所以我们把这些方法委托都从基类Delegate转换成无返回值类型的Action委托。
回调函数无参数:
public static void SyncBroadcast(string eventKey)
{if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){(callback as Action)?.Invoke();}}
}
回调函数带参数,使用泛型实现:
一般来说,三个参数就可以覆盖绝大多数的使用场景,所以我们只实现0~3个参数的方法。
public static void SyncBroadcast<T>(string eventKey, T data)
{if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){(callback as Action<T>)?.Invoke(data);}}
}
错误处理
如果某个回调抛出异常,会中断后续回调的执行,还需要增加 try-catch
包裹回调执行部分。
Broadcast与SyncBroadcast
对于SyncBroadcast,Invoke会在调用SyncBroadcast的线程执行。
但很多时候,我们需要在Unity主线程执行回调函数,可以使LoomManager.QueueOnMainThread方法把回调函数传给主线程执行。于是我们可以这样包装一下:
public static void Broadcast(string eventKey)
{LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey));
}
代码
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using UnityEngine;public static class EventCenter
{private static string TAG = "[EventCenter]";private static readonly ConcurrentDictionary<string, ConcurrentDictionary<Delegate, bool>>eventDic = new ConcurrentDictionary<string, ConcurrentDictionary<Delegate, bool>>();// 添加事件监听public static void AddListener(string eventKey, Action callback){if (!eventDic.ContainsKey(eventKey)){eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();}eventDic[eventKey].TryAdd(callback, false);}#region 有参public static void AddListener<T>(string eventKey, Action<T> callback){if (!eventDic.ContainsKey(eventKey)){eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();}eventDic[eventKey].TryAdd(callback, false);}public static void AddListener<T,U>(string eventKey, Action<T,U> callback){if (!eventDic.ContainsKey(eventKey)){eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();}eventDic[eventKey].TryAdd(callback, false);}public static void AddListener<T, U, V>(string eventKey, Action<T, U, V> callback){if (!eventDic.ContainsKey(eventKey)){eventDic[eventKey] = new ConcurrentDictionary<Delegate, bool>();}eventDic[eventKey].TryAdd(callback, false);}#endregion// 移除事件监听public static void RemoveListener(string eventKey, Action callback){if(eventDic.TryGetValue(eventKey,out ConcurrentDictionary<Delegate,bool> callbackList)){if (callbackList.ContainsKey(callback))callbackList.TryRemove(callback, out bool result);}}// 移除全部事件监听public static void Clear() => eventDic.Clear();#region 有参public static void RemoveListener<T>(string eventKey, Action<T> callback){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){if (callbackList.ContainsKey(callback))callbackList.TryRemove(callback, out bool result);}}public static void RemoveListener<T, U>(string eventKey, Action<T, U> callback){if(eventDic.TryGetValue(eventKey,out ConcurrentDictionary<Delegate,bool> callbackList)){if (callbackList.ContainsKey(callback))callbackList.TryRemove(callback, out bool result);}}public static void RemoveListener<T, U, V>(string eventKey, Action<T, U, V> callback){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){if (callbackList.ContainsKey(callback))callbackList.TryRemove(callback, out bool result);}}#endregion// 立即触发事件public static void SyncBroadcast(string eventKey){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){try{(callback as Action)?.Invoke();}catch(Exception ex){Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");}}}}#region 有参public static void SyncBroadcast<T>(string eventKey, T data){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){try{(callback as Action<T>)?.Invoke(data);}catch (Exception ex){Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");}}}}public static void SyncBroadcast<T, U>(string eventKey, T dataT, U dataU){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){try{(callback as Action<T, U>)?.Invoke(dataT, dataU);}catch (Exception ex){Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");}}}}public static void SyncBroadcast<T, U, V>(string eventKey, T dataT, U dataU, V dataV){if (eventDic.TryGetValue(eventKey, out ConcurrentDictionary<Delegate, bool> callbackList)){foreach (var callback in callbackList.Keys){try{(callback as Action<T, U, V>)?.Invoke(dataT, dataU, dataV);}catch (Exception ex){Debug.LogError(TAG + $"Event:{eventKey} Callback:{callback.Method.Name} Failed: {ex.Message}");}}}}#endregion// 在主线程触发事件public static void Broadcast(string eventKey){LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey));}#region 有参public static void Broadcast<T>(string eventKey, T data){LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, data));}public static void Broadcast<T, U>(string eventKey, T dataT, U dataU){LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, dataT, dataU));}public static void Broadcast<T, U, V>(string eventKey, T dataT, U dataU, V dataV){LoomManager.QueueOnMainThread(() => SyncBroadcast(eventKey, dataT, dataU, dataV));}#endregion
}