skynet源码学习-skynet_mq队列
skynet源码学习-skynet_mq队列
- 核心数据结构分析
- 核心接口详解
- 设计优势分析
- 工作流程详解
- 消息传递全流程
- 队列扩容流程
核心数据结构分析
- 消息结构 (skynet_message)
struct skynet_message {uint32_t source; // 消息来源服务句柄int session; // 会话ID(用于RPC)void * data; // 消息数据指针size_t sz; // 消息大小(含类型信息)
};
- 消息类型编码:通过sz的高8位存储
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
- 服务消息队列 (message_queue)
struct message_queue {struct spinlock lock; // 自旋锁uint32_t handle; // 所属服务句柄int cap; // 队列容量int head; // 队列头指针int tail; // 队列尾指针int release; // 释放标记int in_global; // 是否在全局队列中int overload; // 过载计数int overload_threshold; // 过载阈值(动态调整)struct skynet_message *queue; // 环形缓冲区struct message_queue *next; // 全局队列链表指针
};
- 全局队列 (global_queue)
struct global_queue {struct message_queue *head; // 队列头struct message_queue *tail; // 队列尾struct spinlock lock; // 自旋锁
};
- 全局单例:static struct global_queue *Q = NULL;
核心接口详解
- void skynet_mq_init()
void
skynet_mq_init() {struct global_queue *q = skynet_malloc(sizeof(*q));memset(q,0,sizeof(*q));SPIN_INIT(q);Q=q;
}
- 功能:初始化全局消息队列系统
- 流程:
- 分配全局队列内存
- 初始化头尾指针为NULL
- 初始化自旋锁
- 调用时机:Skynet启动时
- struct message_queue * skynet_mq_create(uint32_t handle)
struct message_queue *
skynet_mq_create(uint32_t handle) {struct message_queue *q = skynet_malloc(sizeof(*q));q->handle = handle;q->cap = DEFAULT_QUEUE_SIZE;q->head = 0;q->tail = 0;SPIN_INIT(q)// When the queue is create (always between service create and service init) ,// set in_global flag to avoid push it to global queue .// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.q->in_global = MQ_IN_GLOBAL;q->release = 0;q->overload = 0;q->overload_threshold = MQ_OVERLOAD;q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);q->next = NULL;return q;
}
-
功能:创建服务私有消息队列
-
参数:handle - 服务句柄
-
流程:
-
初始值:
- 容量:64
- in_global=1(初始状态)
- overload_threshold=1024
- void skynet_mq_push(struct message_queue *q, struct skynet_message *message)
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {assert(message);SPIN_LOCK(q)q->queue[q->tail] = *message;if (++ q->tail >= q->cap) {q->tail = 0;}if (q->head == q->tail) {expand_queue(q);}if (q->in_global == 0) {q->in_global = MQ_IN_GLOBAL;skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
- 功能:向服务队列推送消息
- 核心逻辑:
SPIN_LOCK(q)
1. 消息存入环形缓冲区尾部
2. 如果缓冲区满则扩容(expand_queue)
3. 若队列不在全局队列中:q->in_global = MQ_IN_GLOBAL调用 skynet_globalmq_push(q)
SPIN_UNLOCK(q)
- int skynet_mq_pop(struct message_queue *q, struct skynet_message *message)
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {int ret = 1;SPIN_LOCK(q)if (q->head != q->tail) {*message = q->queue[q->head++];ret = 0;int head = q->head;int tail = q->tail;int cap = q->cap;if (head >= cap) {q->head = head = 0;}int length = tail - head;if (length < 0) {length += cap;}while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2;}} else {// reset overload_threshold when queue is emptyq->overload_threshold = MQ_OVERLOAD;}if (ret) {q->in_global = 0;}SPIN_UNLOCK(q)return ret;
}
- 功能:从服务队列取出消息
- 返回值:0成功,1无消息
- 核心逻辑:
SPIN_LOCK(q)
if (队列非空) {1. 复制头部消息到*message2. 移动头指针3. 动态调整过载阈值:当队列长度 > overload_threshold 时:overload = 当前长度overload_threshold *= 2
} else {重置overload_threshold = MQ_OVERLOAD标记q->in_global = 0(移出全局队列)
}
SPIN_UNLOCK(q)
- void skynet_globalmq_push(struct message_queue *queue)
void
skynet_globalmq_push(struct message_queue * queue) {struct global_queue *q= Q;SPIN_LOCK(q)assert(queue->next == NULL);if(q->tail) {q->tail->next = queue;q->tail = queue;} else {q->head = q->tail = queue;}SPIN_UNLOCK(q)
}
-
功能:将服务队列加入全局队列
-
流程:
-
设计要点:全局队列是服务队列的链表
- struct message_queue * skynet_globalmq_pop()
struct message_queue *
skynet_globalmq_pop() {struct global_queue *q = Q;SPIN_LOCK(q)struct message_queue *mq = q->head;if(mq) {q->head = mq->next;if(q->head == NULL) {assert(mq == q->tail);q->tail = NULL;}mq->next = NULL;}SPIN_UNLOCK(q)return mq;
}
- 功能:从全局队列获取一个待处理的服务队列
- 流程:
SPIN_LOCK(Q)
1. 从链表头部取出一个服务队列
2. 调整链表头指针
SPIN_UNLOCK(Q)
返回服务队列指针
- void skynet_mq_mark_release(struct message_queue *q)
void
skynet_mq_mark_release(struct message_queue *q) {SPIN_LOCK(q)assert(q->release == 0);q->release = 1;if (q->in_global != MQ_IN_GLOBAL) {skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
- 功能:标记服务队列待释放
- 流程:
SPIN_LOCK(q)
1. 设置 q->release = 1
2. 若不在全局队列中,加入全局队列
SPIN_UNLOCK(q)
- 目的:确保待释放队列能被正确处理
- void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud)
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {SPIN_LOCK(q)if (q->release) {SPIN_UNLOCK(q)_drop_queue(q, drop_func, ud);} else {skynet_globalmq_push(q);SPIN_UNLOCK(q)}
}
- 功能:释放服务队列资源
- 参数:
- drop_func:消息丢弃回调函数
- ud:用户数据
- 流程:
- 辅助接口
- uint32_t skynet_mq_handle():获取队列所属服务句柄
uint32_t
skynet_mq_handle(struct message_queue *q) {return q->handle;
}
- int skynet_mq_length():计算当前队列消息数
int
skynet_mq_length(struct message_queue *q) {int head, tail,cap;SPIN_LOCK(q)head = q->head;tail = q->tail;cap = q->cap;SPIN_UNLOCK(q)if (head <= tail) {return tail - head;}return tail + cap - head;
}
- int skynet_mq_overload():获取并重置过载计数
int
skynet_mq_overload(struct message_queue *q) {if (q->overload) {int overload = q->overload;q->overload = 0;return overload;} return 0;
}
设计优势分析
- 三级队列结构
- 优势:
- 全局队列:O(1)获取待处理服务
- 服务队列:隔离不同服务的消息
- 避免全局锁竞争
- 环形缓冲区设计
- 内存布局:
[头] 消息1 | 消息2 | ... | 消息N [尾]
- 优势:
- 避免内存碎片
- 动态扩容(翻倍策略)
- 高效头尾指针操作
- 智能过载处理
// 动态阈值调整
while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2; // 指数退避
}
- 优势:
- 避免频繁触发过载检测
- 指数退避减少检测开销
- 提供过载监控接口
- 精细锁控制
- 锁策略:
资源 | 锁类型 | 粒度 |
---|---|---|
全局队列 | 自旋锁 | 全局 |
服务队列 | 自旋锁 | 单个服务 |
消息处理 | 无锁 | - |
- 优势:
- 服务间并行处理
- 最小化临界区
- 优雅释放机制
- 标记阶段:skynet_mq_mark_release
- 清理阶段:skynet_mq_release
- 消息回调:通过drop_func处理残留消息
- 优势:
- 安全释放资源
- 避免消息丢失
- 支持自定义清理逻辑