当前位置: 首页 > news >正文

skynet源码学习-skynet_mq队列

skynet源码学习-skynet_mq队列

  • 核心数据结构分析
  • 核心接口详解
  • 设计优势分析
  • 工作流程详解
    • 消息传递全流程
    • 队列扩容流程

核心数据结构分析

  1. 消息结构 (skynet_message)
struct skynet_message {uint32_t source;  // 消息来源服务句柄int session;      // 会话ID(用于RPC)void * data;      // 消息数据指针size_t sz;        // 消息大小(含类型信息)
};
  • 消息类型编码:通过sz的高8位存储
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
  1. 服务消息队列 (message_queue)
struct message_queue {struct spinlock lock;        // 自旋锁uint32_t handle;             // 所属服务句柄int cap;                     // 队列容量int head;                    // 队列头指针int tail;                    // 队列尾指针int release;                 // 释放标记int in_global;               // 是否在全局队列中int overload;                // 过载计数int overload_threshold;      // 过载阈值(动态调整)struct skynet_message *queue; // 环形缓冲区struct message_queue *next;   // 全局队列链表指针
};
  1. 全局队列 (global_queue)
struct global_queue {struct message_queue *head;  // 队列头struct message_queue *tail;  // 队列尾struct spinlock lock;        // 自旋锁
};
  • 全局单例:static struct global_queue *Q = NULL;

核心接口详解

  1. void skynet_mq_init()
void 
skynet_mq_init() {struct global_queue *q = skynet_malloc(sizeof(*q));memset(q,0,sizeof(*q));SPIN_INIT(q);Q=q;
}
  • 功能:初始化全局消息队列系统
  • 流程:
    1. 分配全局队列内存
    2. 初始化头尾指针为NULL
    3. 初始化自旋锁
  • 调用时机:Skynet启动时
  1. struct message_queue * skynet_mq_create(uint32_t handle)
struct message_queue * 
skynet_mq_create(uint32_t handle) {struct message_queue *q = skynet_malloc(sizeof(*q));q->handle = handle;q->cap = DEFAULT_QUEUE_SIZE;q->head = 0;q->tail = 0;SPIN_INIT(q)// When the queue is create (always between service create and service init) ,// set in_global flag to avoid push it to global queue .// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.q->in_global = MQ_IN_GLOBAL;q->release = 0;q->overload = 0;q->overload_threshold = MQ_OVERLOAD;q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);q->next = NULL;return q;
}
  • 功能:创建服务私有消息队列

  • 参数:handle - 服务句柄

  • 流程:在这里插入图片描述

  • 初始值:

    • 容量:64
    • in_global=1(初始状态)
    • overload_threshold=1024
  1. void skynet_mq_push(struct message_queue *q, struct skynet_message *message)
void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {assert(message);SPIN_LOCK(q)q->queue[q->tail] = *message;if (++ q->tail >= q->cap) {q->tail = 0;}if (q->head == q->tail) {expand_queue(q);}if (q->in_global == 0) {q->in_global = MQ_IN_GLOBAL;skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
  • 功能:向服务队列推送消息
  • 核心逻辑:
SPIN_LOCK(q)
1. 消息存入环形缓冲区尾部
2. 如果缓冲区满则扩容(expand_queue)
3. 若队列不在全局队列中:q->in_global = MQ_IN_GLOBAL调用 skynet_globalmq_push(q)
SPIN_UNLOCK(q)
  1. int skynet_mq_pop(struct message_queue *q, struct skynet_message *message)
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {int ret = 1;SPIN_LOCK(q)if (q->head != q->tail) {*message = q->queue[q->head++];ret = 0;int head = q->head;int tail = q->tail;int cap = q->cap;if (head >= cap) {q->head = head = 0;}int length = tail - head;if (length < 0) {length += cap;}while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2;}} else {// reset overload_threshold when queue is emptyq->overload_threshold = MQ_OVERLOAD;}if (ret) {q->in_global = 0;}SPIN_UNLOCK(q)return ret;
}
  • 功能:从服务队列取出消息
  • 返回值:0成功,1无消息
  • 核心逻辑:
SPIN_LOCK(q)
if (队列非空) {1. 复制头部消息到*message2. 移动头指针3. 动态调整过载阈值:当队列长度 > overload_threshold 时:overload = 当前长度overload_threshold *= 2
} else {重置overload_threshold = MQ_OVERLOAD标记q->in_global = 0(移出全局队列)
}
SPIN_UNLOCK(q)
  1. void skynet_globalmq_push(struct message_queue *queue)
void 
skynet_globalmq_push(struct message_queue * queue) {struct global_queue *q= Q;SPIN_LOCK(q)assert(queue->next == NULL);if(q->tail) {q->tail->next = queue;q->tail = queue;} else {q->head = q->tail = queue;}SPIN_UNLOCK(q)
}
  • 功能:将服务队列加入全局队列

  • 流程:
    在这里插入图片描述

  • 设计要点:全局队列是服务队列的链表

  1. struct message_queue * skynet_globalmq_pop()
struct message_queue * 
skynet_globalmq_pop() {struct global_queue *q = Q;SPIN_LOCK(q)struct message_queue *mq = q->head;if(mq) {q->head = mq->next;if(q->head == NULL) {assert(mq == q->tail);q->tail = NULL;}mq->next = NULL;}SPIN_UNLOCK(q)return mq;
}
  • 功能:从全局队列获取一个待处理的服务队列
  • 流程:
SPIN_LOCK(Q)
1. 从链表头部取出一个服务队列
2. 调整链表头指针
SPIN_UNLOCK(Q)
返回服务队列指针
  1. void skynet_mq_mark_release(struct message_queue *q)
void 
skynet_mq_mark_release(struct message_queue *q) {SPIN_LOCK(q)assert(q->release == 0);q->release = 1;if (q->in_global != MQ_IN_GLOBAL) {skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
  • 功能:标记服务队列待释放
  • 流程:
SPIN_LOCK(q)
1. 设置 q->release = 1
2. 若不在全局队列中,加入全局队列
SPIN_UNLOCK(q)
  • 目的:确保待释放队列能被正确处理
  1. void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud)
void 
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {SPIN_LOCK(q)if (q->release) {SPIN_UNLOCK(q)_drop_queue(q, drop_func, ud);} else {skynet_globalmq_push(q);SPIN_UNLOCK(q)}
}
  • 功能:释放服务队列资源
  • 参数:
    • drop_func:消息丢弃回调函数
    • ud:用户数据
  • 流程:
    在这里插入图片描述
  1. 辅助接口
  • uint32_t skynet_mq_handle():获取队列所属服务句柄
uint32_t 
skynet_mq_handle(struct message_queue *q) {return q->handle;
}
  • int skynet_mq_length():计算当前队列消息数
int
skynet_mq_length(struct message_queue *q) {int head, tail,cap;SPIN_LOCK(q)head = q->head;tail = q->tail;cap = q->cap;SPIN_UNLOCK(q)if (head <= tail) {return tail - head;}return tail + cap - head;
}
  • int skynet_mq_overload():获取并重置过载计数
int
skynet_mq_overload(struct message_queue *q) {if (q->overload) {int overload = q->overload;q->overload = 0;return overload;} return 0;
}

设计优势分析

  1. 三级队列结构
    在这里插入图片描述
  • 优势:
    • 全局队列:O(1)获取待处理服务
    • 服务队列:隔离不同服务的消息
    • 避免全局锁竞争
  1. 环形缓冲区设计
  • 内存布局:
[] 消息1 | 消息2 | ... | 消息N []
  • 优势:
    • 避免内存碎片
    • 动态扩容(翻倍策略)
    • 高效头尾指针操作
  1. 智能过载处理
// 动态阈值调整
while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2; // 指数退避
}
  • 优势:
    • 避免频繁触发过载检测
    • 指数退避减少检测开销
    • 提供过载监控接口
  1. 精细锁控制
  • 锁策略:
资源锁类型粒度
全局队列自旋锁全局
服务队列自旋锁单个服务
消息处理无锁-
  • 优势:
    • 服务间并行处理
    • 最小化临界区
  1. 优雅释放机制
    1. 标记阶段:skynet_mq_mark_release
    2. 清理阶段:skynet_mq_release
    3. 消息回调:通过drop_func处理残留消息
  • 优势:
    • 安全释放资源
    • 避免消息丢失
    • 支持自定义清理逻辑

工作流程详解

消息传递全流程

在这里插入图片描述

队列扩容流程

在这里插入图片描述

http://www.xdnf.cn/news/1016497.html

相关文章:

  • 目标检测标注格式
  • 对象映射 C# 中 Mapster 和 AutoMapper 的比较
  • 无人机侦测与反制技术进展
  • 精益数据分析(101/126):SaaS商业模式优化与用户生命周期价值提升策略
  • React 第六十一节 Router 中 createMemoryRouter的使用详解及案例注意事项
  • 【CSS-12】掌握CSS列表样式:从基础到高级技巧
  • 如何快速搭建门店系统?
  • 浅析MySQL数据迁移与恢复:从SQLServer转型到MySQL
  • 搭建网站应该怎样选择服务器?
  • 在mac上安装sh脚本文件
  • C++标准库大全(STL)
  • Spring Boot 集成国内AI,包含文心一言、通义千问和讯飞星火平台实战教程
  • 域名+nginx反向代理实现案例
  • Python学习笔记:错误和异常处理
  • 影像组学5:Radiomics Score的计算
  • 深度学习驱动的验证码识别实战:从原理到高并发工业部署
  • YOLOV11改进之多尺度扩张残差模块(MS-DRM)
  • [特殊字符][特殊字符] Harmony OS Next玩转多层级手势事件:当组件遇上“套娃”,触摸该怎么分家?
  • 北斗导航 | 基于matlab的卫星导航单点定位算法
  • Linux文件权限详解:从入门到精通
  • 每日Prompt:Steve Winter风格插画
  • 2.3 ASPICE的架构与设计
  • 服务器上安装配置vsftpd
  • Java流处理中的常见错误与最佳实践
  • 第八十篇 大数据开发基石:深入解析栈结构及其生活化应用(附全流程图解)
  • Cloud Events:事件驱动架构的未来标准化
  • 访问者模式:解耦数据结构与操作的优雅之道
  • 前端性能优化:打造极致用户体验
  • 洛谷:B3799 [NICA #1] 序列
  • 单片机,主循环和中断资源访问冲突的案例