【Linux高级全栈开发】2.3.1 协程设计原理与汇编实现2.3.2 协程调度器实现与性能测试
【Linux高级全栈开发】2.3.1 协程设计原理与汇编实现2.3.2 协程调度器实现与性能测试
一、协程基础概念与存在意义
协程定义
- 轻量级用户态线程,通过主动调度实现单线程内多任务协作,兼具同步编程逻辑与异步性能。
核心价值
- 解决同步 IO 阻塞问题,避免多线程高开销,适用于 IO 密集型场景(如网络请求、数据库操作)。
与线程对比
特性 协程 线程 创建成本 微秒级(极低) 毫秒级(较高) 内存占用 几 KB 几 MB 调度方式 非抢占式(主动让出) 抢占式(系统调度) 适用场景 IO 密集型 CPU 密集型 二、协程实现原理与关键技术
- 核心原语操作
create
:创建协程,分配栈空间并初始化上下文。resume
:恢复协程执行,切换至目标协程上下文。yield
:主动让出执行权,保存当前协程状态并切换至调度器。switch
:底层上下文切换,保存 / 恢复寄存器状态(如 EIP、ESP、通用寄存器)。- 上下文切换实现方式
setjmp/longjmp
:跨平台但破坏栈结构,适用于简单场景。ucontext
:系统级上下文管理,通过swapcontext
实现切换,移植性受限。- 汇编实现:直接操作寄存器(如 x86 的 EIP、ESP),性能高但开发难度大。
- 寄存器操作关键逻辑
- 保存当前状态:将
ESP
(栈指针)、EBP
(基址指针)、EIP
(指令指针)及通用寄存器值存入协程上下文结构体。- 恢复目标状态:从目标协程上下文中读取寄存器值,通过
ret
指令跳转至目标EIP
。三、协程调度器设计与实现
调度器核心功能
- 管理协程生命周期(就绪、睡眠、等待状态),通过事件驱动(如 epoll)处理 IO 就绪事件,触发协程恢复。
数据结构设计
- 就绪队列(Ready Queue):存储可执行协程,采用双向链表实现快速入队 / 出队。
- 睡眠树(Sleep Tree):基于红黑树管理超时协程,按剩余时间排序。
- 等待树(Wait Tree):通过红黑树关联 IO 事件(如文件描述符)与协程,结合 epoll 检测事件就绪。
执行流程
while (1) {// 处理超时协程expire_co = sleep_tree_check();if (expire_co) add_to_ready_queue(expire_co);// 监听IO事件nready = epoll_wait(epfd, events, MAX_EVENTS);for (i=0; i<nready; i++) {wait_co = wait_tree_search(events[i].fd);if (wait_co) add_to_ready_queue(wait_co);}// 调度就绪协程while ((ready_co = pop_from_ready_queue())) {resume(ready_co); // 切换至协程上下文执行} }
四、协程性能优化与测试
- 性能优化方向
- 栈空间管理:采用固定大小栈(如 4KB)或动态扩展栈,减少内存碎片。
- 多核支持:多线程 + 协程模式,每个线程独立维护调度器,通过 CPU 亲和性绑定核心。
- Hook 机制:对系统调用(如
read/write
)进行钩子拦截,实现异步化改造。- 性能测试案例
- 百万并发测试:使用
NtyCo
框架在 4 核 6G 服务器上创建 100 万协程,单连接栈空间 4KB,无内存溢出且正常收发数据。- 对比指标:同步模式(6500ms/1000 连接) vs 异步协程模式(900ms/1000 连接),响应时间提升约 86%。
五、协程在项目中的应用与简历描述
- 典型应用场景
- 高并发 Web 服务器(如 Nginx 协程扩展)、异步数据库访问(如 MySQL 请求非阻塞处理)、微服务架构中的异步通信。
- 简历撰写示例
- 项目名称:高并发图床服务
- 技术点:使用
ntyco
协程框架实现百万级连接处理,通过 epoll + 协程非阻塞模型提升 IO 效率,对比传统线程模型吞吐量提升 300%。- 职责:设计协程调度器,优化上下文切换性能;集成 Redis 缓存,通过协程异步预加载提升响应速度。
六、扩展与进阶
- 协程与 DPDK 结合:在用户态协议栈中使用协程处理网络包,减少内核态上下文切换开销。
- 语言层协程支持:如 Go 的 Goroutine、Python 的
asyncio
,底层均基于类似原理实现。
2.3.1 协程设计原理与汇编实现(9个问题)
1 为什么要有协程?协程存在的 3 个原因
-
协程存在的原因解决的问题:如何把同步的
recv/send
改为异步的recv/send
-
以Linux 为例,在这里需要介绍一个“网红 ”就是 epoll。服务器使用 epoll 管理 百万计的客户端长连接,代码框架如下:
while (1) {int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);for (i = 0;i < nready;i ++) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {int connfd = accept(listenfd, xxx, xxxx);setnonblock(connfd);ev.events = EPOLLIN | EPOLLET;ev.data.fd = connfd;epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);} else {handle(sockfd);}} }
-
对于服务器处理百万计的 IO。Handle(sockfd)实现方式有两种。
- *第一种*,handle(sockfd)函数内部对 sockfd 进行读写动作。handle 的 io 操作(send,recv)与 epoll_wait 是在同一个处理流程里面的。 这就是 IO 同步操作。每 1000 个连接接入的服务器响应时间(6500ms 左右)。
- *第二种*,handle(sockfd)函数内部将 sockfd 的操作,push 到线程池中,Handle 函数是将 sockfd 处理方式放到另一个已经其他的线程中运行,如此做 法,将 io 操作(recv,send)与 epoll_wait 不在一个处理流程里面,使得 io 操作(recv,send)与 epoll_wait 实现解耦。这就叫做 IO 异步操作。IO 异步操作,每 1000 个连接接入的服务器响应时间(900ms 左右)。
-
优点:
- 子模块好规划。
- 程序性能高。
-
缺点:正因为子模块好规划,使得模块之间的 sockfd 的管理异常麻烦。每一个子线程 都需要管理好 sockfd,避免在 IO 操作的时候,sockfd 出现关闭或其他异常。
-
-
有没有一种方式,有异步性能,同步的代码逻辑。来方便编程人员对 IO 操作的 组件呢? 有,采用一种轻量级的协程来实现。在每次 send 或者 recv 之前进行 切换,再由调度器来处理 epoll_wait 的流程。
-
传统同步编程在遇到 IO 操作(如网络请求、文件读写 )时,线程会阻塞等待,期间 CPU 闲置,浪费资源。多线程虽可并发处理任务,但线程上下文切换开销大,且存在同步、互斥等复杂问题。协程作为轻量级用户态线程,能在单线程内实现多个任务的协作式调度,避免阻塞等待,以同步编程方式获得异步性能,适合 IO 密集型场景。
-
同步与异步性能:形容两者之间的关系,同步比异步要慢:同步编程模式下,程序按顺序执行,在某些操作(如 I/O 操作)等待时会阻塞,浪费 CPU 时间。异步编程可让程序在等待时去处理其他任务,提高效率。协程是异步编程的一种实现方式,能更好地平衡同步和异步的性能,避免不必要的等待和资源浪费 。
- 同步执行示例:
//dns_client_commit(argv[1]); int count = sizeof(domain) / sizeof(domain[0]); int i = 0;for (i = 0;i < count;i ++) {dns_client_commit(domain[i]); }
-
异步执行示例:
struct async_context *ctx = dns_async_client_init(); if (ctx == NULL) return -2; int count = sizeof(domain) / sizeof(domain[0]); int i = 0; for (i = 0;i < count;i ++) {dns_async_client_commit(ctx, domain[i], dns_async_client_result_callback); //sleep(2);} getchar();
-
服务端异步处理:在服务端,大量请求到来时,若采用传统同步处理,每个请求都要等待前面的处理完,会造成性能瓶颈。协程可使服务端在处理一个请求的耗时操作(如数据库查询)时,切换去处理其他请求,充分利用资源,提升服务端并发处理能力。
-
把检测是否就绪的
epoll_wait
与读写io
的recv/send
不放到一个流程中,把检测到的io丢到线程池里去读写:if (nRun) {printf(" New Data is Comming\n");client_data_process(clientfd); } else {// epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);client_t *rClient = (client_t*)malloc(sizeof(client_t));memset(rClient, 0, sizeof(client_t)); rClient->fd = clientfd;job_t *job = malloc(sizeof(job_t));job->job_function = client_job;job->user_data = rClient;workqueue_add_job(&workqueue, job); }//993ms
while(1){ int nready = epoll_wait(); for(i=0;i<nready;i ++){recv(event[i].fd,buffer);send(event[i].fd,buffer); }
-
检测与读写io在一个流程中:
client_data_process(clientfd); // 5600ms
void *task_callback(void *arg{recv(event[il.fd, buffer);send(event[i].fd, buffer); } while(1){int nready = epoll_wait();for(i=0; i<nready; i ++){task.fd = event[i].fd;task.callback = task_callback;push_task_to_threadpool(&task);} }
-
-
客户端异步请求:客户端发起网络请求等操作时,可能有数十个接口需要去请求,若用同步方式,请求会是一种串行,一个崩溃的话,后续界面可能会卡住。协程可实现异步请求,在等待响应时,请求是并行的,客户端界面仍可响应用户操作,提升用户体验 。
-
异步请求应用场景速览
- DNS:非阻塞解析域名,避免单个查询阻塞整个服务(如 Node.js
dns.promises
)。 - HTTP:高并发请求(如爬虫、微服务调用),单线程处理数千连接(如 Python
aiohttp
、JavaScriptfetch
)。 - Redis:异步读写缓存 / 队列,提升吞吐量(如 Python
redis.asyncio
、Node.jsioredis
)。 - Kafka:异步生产 / 消费消息,支持百万级 TPS(如 Python
aiokafka
、JavaKafkaConsumer
异步 API)。 - MongoDB:异步数据库操作,减少 IO 等待(如 Python
motor
、Node.jsmongoose
异步方法)。 - 抖音
- 视频播放页:主视频流分片加载的同时,预加载推荐视频列表、加载评论区内容以及实时更新互动数据等。
- 直播场景:视频流拉取的同时,通过长连接实时推送弹幕消息、实时刷新观众列表以及加载主播信息和商品橱窗等。
- 微信
- 朋友圈浏览:图文内容瀑布流加载时,预加载好友头像及个人信息,同时同步点赞与评论数据。
- 群聊场景:批量拉取历史消息的同时,实时更新在线成员状态,预加载图片或视频缩略图。
- 淘宝
- 商品详情页:加载商品主图及详情图的同时,实时查询价格与库存,获取用户评价并加载推荐商品。
- 购物车结算:多商品价格计算与库存锁定验证同时进行,查询收货地址与配送方式,计算支付优惠。
- 美团
- 商家详情页:菜品图片与价格请求的同时,加载用户评价与评分,查询商家位置与配送范围。
- 即时配送:实时更新骑手位置与订单状态轮询同时进行,规划地图路径并确认支付状态。
- DNS:非阻塞解析域名,避免单个查询阻塞整个服务(如 Node.js
-
异步的缺点:代码逻辑比较复杂,不符合人的思维逻辑,比如回调函数嵌套编写,很难理解
-
协程的核心价值:(同步编程,异步性能)轻量级线程,由程序主动调度(非操作系统内核),适合 IO 密集型场景。通过同步风格编写异步代码(如 Python
async/await
、Gogoroutine
),避免回调地狱,显著提升并发效率。协程(Coroutine)是一种比线程更加轻量级的并发编程模型,它允许程序在执行过程中暂停和恢复,从而高效地处理异步操作。 -
协程的核心原理
协程的实现依赖于以下技术:
- 非抢占式调度:协程主动让出控制权(通过
yield
、await
等)。 - 状态保存:暂停时保存局部变量和执行位置。
- 事件循环:管理协程的调度和恢复(如 Python 的
asyncio
循环)。
- 非抢占式调度:协程主动让出控制权(通过
-
协程 vs 线程
特性 协程 线程 创建成本 极低(微秒级) 较高(毫秒级) 内存占用 几 KB 几 MB 调度方式 非抢占式(主动让出) 抢占式(系统调度) 适用场景 IO 密集型任务(如网络请求) CPU 密集型任务
2 协程实现过程及原语操作
协程如何使用?与线程使用有何区别?
每请求每线程的代码如下:
while(1) {socklen_t len = sizeof(struct sockaddr_in);int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);pthread_t thread_id;pthread_create(&thread_id, NULL, client_cb, &clientfd);}
协程参考代码如下:
while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}
线程的API思维来使用协程,函数调用的性能来测试协程。
NtyCo 封装出来了若干接口,一类是协程本身的,二类是 posix 的异步封装 协程 API:while
-
协程创建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
-
协程调度器的运行
void nty_schedule_run(void)
-
POSIX 异步封装 API:
int nty_socket(int domain, int type, int protocol) int nty_accept(int fd, struct sockaddr *addr, socklen_t *len) int nty_recv(int fd, void *buf, int length) int nty_send(int fd, const void *buf, int length) int nty_close(int fd)
协程的实现之工作流程
*3.1* *创建协程*
当我们需要异步调用的时候,我们会创建一个协程。比如 accept 返回一个新的sockfd,创建一个客户端处理的子过程。再比如需要监听多个端口的时候,创建一个server 的子过程,这样多个端口同时工作的,是符合微服务的架构的。
创建协程的时候,进行了如何的工作?创建 API 如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
- 参数 1 :nty_coroutine **new_co,需要传入空的协程的对象,这个对象是由内部 创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。
- 参数 2:proc_coroutine func,协程的子过程。当协程被调度的时候,就会执行该 函数。
- 参数 3:void *arg,需要传入到新协程中的参数。
*3.2* *实现* *IO* *异步操作*
大部分的朋友会关心 IO 异步操作如何实现,在 send 与 recv 调用的时候,如何实现异 步操作的。
先来看一下一段代码:
while (1) {
int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);for (i = 0;i < nready;i ++) {int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
int connfd = accept(listenfd, xxx, xxxx);setnonblock(connfd);ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);} else {epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL); recv(sockfd, buffer, length, 0);//parser_proto(buffer, length);send(sockfd, buffer, length, 0);
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL); }
}
}
在进行 IO 操作(recv,send)之前,先执行了 epoll_ctl 的 del 操作,将相应的 sockfd 从 epfd 中删除掉,在执行完 IO 操作(recv,send)再进行 epoll_ctl 的 add 的动作。这段代码看起来 似乎好像没有什么作用。
如果是在多个上下文中,这样的做法就很有意义了。能够保证 sockfd 只在一个上下文中 能够操作 IO 的。不会出现在多个上下文同时对一个 IO 进行操作的。协程的 IO 异步操作正 式是采用此模式进行的。
把单一协程的工作与调度器的工作的划分清楚,先引入两个原语操作 resume
,yield
,yield 就是让出运行,resume 就是恢复运行。调度器与协程的上下文切换如下图所示
在协程的上下文 IO 异步操作(nty_recv ,nty_send
)函数,步骤如下:
- 将 sockfd 添加到 epoll 管理中。
- 进行上下文环境切换,由协程上下文 yield 到调度器的上下文。
- 调度器获取下一个协程上下文。Resume 新的协程 IO 异步操作的上下文切换的时序图如下:
协程的核心原语操作:
-
create:创建一个协程:
-
调度器是否存在,不存在也创建。调度器作为全局的单例。将调度 器的实例存储在线程的私有空间 pthread_setspecific。
-
分配一个 coroutine 的内存空间,分别设置 coroutine 的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。
-
将新分配协程添加到就绪队列 ready_queue 中 实现代码如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func,void *arg) {assert(pthread_once(&sched_key_once,nty_coroutine_sched_key_creator) == 0);nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) {nty_schedule_create(0);sched = nty_coroutine_get_sched();if (sched == NULL) {printf("Failed to create scheduler\n");return -1;}}nty_coroutine *co = calloc(1, sizeof(nty_coroutine));if (co == NULL) {printf("Failed to allocate memory for new coroutine\n"); return -2;}//int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);if (ret) {printf("Failed to allocate stack for new coroutine\n"); free(co);return -3; }co->sched = sched;co->stack_size = sched->stack_size;co->status = BIT(NTY_COROUTINE_STATUS_NEW); //co->id = sched->spawned_coroutines ++;co->func = func;co->fd = -1;co->events = 0;co->arg = arg;co->birth = nty_coroutine_usec_now();*new_co = co;TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
-
yield: 让出 CPU。
oid nty_coroutine_yield(nty_coroutine *co)
参数:当前运行的协程实例
调用后该函数不会立即返回,而是切换到最近执行 resume 的上下文。该函 数返回是在执行 resume 的时候,会有调度器统一选择 resume 的,然后再 次调用 yield 的。resume 与 yield 是两个可逆过程的原子操作。
-
resume:恢复协程的运行权
nty_coroutine_resume(nty_coroutine *co)
参数:需要恢复运行的协程实例
调用后该函数也不会立即返回,而是切换到运行协程实例的 yield 的位 置。返回是在等协程相应事务处理完成后,主动 yield 会返回到 resume 的地 方。
-
-
switch:用于协程间的切换,保存当前协程的执行状态(如寄存器的值、程序计数器等),并恢复目标协程的执行状态,让程序执行流从一个协程转移到另一个协程 。
-
resume:用于恢复一个挂起的协程的执行,将协程从暂停状态切换到运行状态,继续执行协程内后续的代码 。
-
yield:表示当前协程主动让出执行权,将控制权交给调度器或其他协程,自身进入暂停状态,待合适时机再被唤醒继续执行 。
3 协程切换的三种实现方式
原语操作相关:
setjmp/longjmp:
-
提供低级非局部跳转,
setjmp
保存当前上下文,longjmp
恢复并跳转。但使用时需注意保证协程栈空间已建立且未退出,且可能影响程序可维护性和可读性 。#include <stdio.h> #include <setjmp.h>jmp_buf env; // 用于存储程序上下文环境的缓冲区// 测试函数,接收一个整数参数 void func(int arg) {printf("Calling func with arg = %d\n", arg); // 打印当前参数值longjmp(env, ++arg); // 跳回到setjmp处,并将arg+1作为返回值 }int main() { // 程序入口点int ret = setjmp(env); // 设置跳转点,首次执行返回0printf("Returned from longjmp with value: %d\n", ret); // 打印返回值if (ret == 0) {printf("First call (ret == 0)\n"); // 首次调用分支func(ret); // 调用func函数,触发longjmp} else if (ret == 1) {printf("Second call (ret == 1)\n"); // 第二次调用分支func(ret); // 再次调用func函数} else if (ret == 2) {printf("Third call (ret == 2)\n"); // 第三次调用分支func(ret); // 第三次调用func函数} else {printf("Final call (ret >= 3)\n"); // 最终调用分支printf("Exiting program.\n"); // 输出退出信息}return 0; // 程序正常结束 }
ucontext:
-
在类 Unix 系统(如 Linux )中用于实现协程和用户态线程,通过保存和恢复 CPU 寄存器、堆栈指针等状态,允许程序在不同执行流间切换。主要函数有
swapcontext(ucontext_t *oucp, const ucontext_t *ucp)
,用于保存当前上下文到oucp
,并切换到ucp
上下文 。#include <ucontext.h> // 定义两个ucontext_t类型的变量,用于存储协程上下文 ucontext_t ctx[2]; // 定义主上下文变量,用于存储主程序的上下文 ucontext_t main_ctx; // 定义计数器,用于控制循环次数 int count = 0; // 协程函数func1,无参数,返回值为void void func1(void) { // 当count小于20时循环执行while (count ++ < 20) { printf("1\n"); // 将当前协程(ctx[0])的上下文切换到另一个协程(ctx[1])的上下文swapcontext(&ctx[0], &ctx[1]); printf("3\n"); } }// 协程函数func2,无参数,返回值为void void func2(void) { char a[4096] = {0}; // 当count小于20时循环执行while (count ++ < 20) { printf("2\n"); // 将当前协程(ctx[1])的上下文切换到另一个协程(ctx[0])的上下文swapcontext(&ctx[1], &ctx[0]); printf("4\n"); } }int main() {char stack1[2048] = {0};char stack2[2048] = {0};// 获取当前上下文并保存到ctx[0]中getcontext(&ctx[0]); // 设置ctx[0]对应的栈空间起始地址ctx[0].uc_stack.ss_sp = stack1; // 设置ctx[0]对应的栈空间大小ctx[0].uc_stack.ss_size = sizeof(stack1); // 设置ctx[0]的链接上下文为主程序上下文main_ctxctx[0].uc_link = &main_ctx; // 创建一个新的上下文环境,指定执行函数为func1,无参数makecontext(&ctx[0], func1, 0); // 获取当前上下文并保存到ctx[1]中getcontext(&ctx[1]); // 设置ctx[1]对应的栈空间起始地址ctx[1].uc_stack.ss_sp = stack2; // 设置ctx[1]对应的栈空间大小ctx[1].uc_stack.ss_size = sizeof(stack2); // 设置ctx[1]的链接上下文为主程序上下文main_ctxctx[1].uc_link = &main_ctx; // 创建一个新的上下文环境,指定执行函数为func2,无参数makecontext(&ctx[1], func2, 0); printf("swapcontext\n");// 将主程序上下文切换到ctx[0]对应的协程上下文,开始执行func1swapcontext(&main_ctx, &ctx[0]); printf("\n");return 0; }
这段 C 代码主要利用
ucontext.h
头文件提供的函数来实现简单的协程切换功能。ucontext_t
类型用于存储程序的上下文信息,包括寄存器状态、栈指针等。通过getcontext
获取当前上下文,makecontext
设置上下文要执行的函数等信息,swapcontext
实现上下文之间的切换。代码定义了两个协程函数
func1
和func2
,在main
函数中设置好它们的上下文环境后,通过swapcontext
进行切换,使得两个协程函数交替执行,在循环条件(count < 20
)满足的情况下,打印出特定的字符序列,从而实现协程之间的协作式多任务处理 。输出为:
1 2 3 1 4 2 3 1 4 2 3 。。。
-
- 更复杂的调度器代码
#define _GNU_SOURCE #include <dlfcn.h> #include <stdio.h> #include <ucontext.h> #include <string.h> #include <unistd.h> #include <fcntl.h>ucontext_t ctx[3]; // 用于存储三个协程的上下文信息 ucontext_t main_ctx; // 用于存储主程序的上下文信息 int count = 0; // 计数器,用于控制协程循环执行的次数// hook相关类型定义 typedef ssize_t (*read_t)(int fd, void *buf, size_t count); read_t read_f = NULL; // 用于存储原始的read函数指针typedef ssize_t (*write_t)(int fd, const void *buf, size_t count); write_t write_f = NULL; // 用于存储原始的write函数指针// 重定义read函数,实现对read操作的hook ssize_t read(int fd, void *buf, size_t count) {ssize_t ret = read_f(fd, buf, count); // 调用原始的read函数进行读取操作printf("read: %s\n", (char *)buf); // 打印读取到的内容return ret; // 返回读取的字节数 }// 重定义write函数,实现对write操作的hook ssize_t write(int fd, const void *buf, size_t count) {printf("write: %s\n", (const char *)buf); // 打印要写入的内容return write_f(fd, buf, count); // 调用原始的write函数进行写入操作 }void init_hook(void) {if (!read_f) {// 通过dlsym获取下一个(原始的)read函数指针read_f = dlsym(RTLD_NEXT, "read"); }if (!write_f) {// 这里存在错误,应该获取write函数指针,却错误地获取read函数指针write_f = dlsym(RTLD_NEXT, "read"); } }// 协程函数1 void func1(void) {while (count ++ < 30) { // 当count小于30时循环执行printf("1\n");// 将当前协程(ctx[0])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[0], &main_ctx); printf("4\n");} }// 协程函数2 void func2(void) {while (count ++ < 30) {printf("2\n");// 将当前协程(ctx[1])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[1], &main_ctx); printf("5\n");} }// 协程函数3 void func3(void) {while (count ++ < 30) {printf("3\n");// 将当前协程(ctx[2])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[2], &main_ctx); printf("6\n");} }int main() {init_hook(); // 初始化函数hookint fd = open("a.txt", O_CREAT | O_RDWR); // 以读写模式创建文件a.txtif (fd < 0) {return -1; // 打开文件失败则返回-1}char *str = "1234567890";write(fd, str, strlen(str)); // 向文件中写入字符串char buffer[128] = {0};read(fd, buffer, 128); // 从文件中读取内容到bufferprintf("buffer: %s\n", buffer); // 打印读取到的内容#if 1char stack1[2048] = {0};char stack2[2048] = {0};char stack3[2048] = {0};// 获取并设置第一个协程的上下文getcontext(&ctx[0]);ctx[0].uc_stack.ss_sp = stack1;ctx[0].uc_stack.ss_size = sizeof(stack1);ctx[0].uc_link = &main_ctx;makecontext(&ctx[0], func1, 0);// 获取并设置第二个协程的上下文getcontext(&ctx[1]);ctx[1].uc_stack.ss_sp = stack2;ctx[1].uc_stack.ss_size = sizeof(stack2);ctx[1].uc_link = &main_ctx;makecontext(&ctx[1], func2, 0);// 获取并设置第三个协程的上下文getcontext(&ctx[2]);ctx[2].uc_stack.ss_sp = stack3;ctx[2].uc_stack.ss_size = sizeof(stack3);ctx[2].uc_link = &main_ctx;makecontext(&ctx[2], func3, 0);printf("swapcontext\n");while (count <= 30) { // 调度器循环// 切换到对应的协程上下文执行,实现协程轮流执行swapcontext(&main_ctx, &ctx[count%3]); }printf("\n"); #endifreturn 0; }
- 整体功能:
- 这段代码综合了函数 hook 和协程调度的功能。通过函数 hook,重定义了
read
和write
函数,使得在进行文件读写操作时能够打印出读写的内容。同时,利用ucontext.h
中的函数实现了三个协程(func1
、func2
、func3
),每个协程在满足一定循环条件下打印特定字符并进行上下文切换。在main
函数中,通过调度器逻辑(while (count <= 30)
循环以及swapcontext
调用)来轮流执行这三个协程,实现了协程间的协作式多任务处理。 - 调度器的作用:
- 调度器(在代码中就是
main
函数里的while (count <= 30)
循环部分)的存在是为了控制多个协程的执行顺序和时机。协程本身只是一段代码逻辑,它们不会自动按照期望的方式交替执行。通过调度器,利用swapcontext
函数,可以有规律地切换到不同的协程上下文,让各个协程轮流获得执行机会,从而实现多个任务(这里是三个协程函数的执行)之间的协作运行,避免某个协程一直占用执行资源,提高程序的并发处理能力和资源利用率 。 此外,调度器还可以根据特定的条件(如这里的count
值)来决定协程的执行次数和何时终止等逻辑 。 同时,结合前面的函数 hook 功能,使得在协程执行过程中涉及文件读写操作时,能够按照 hook 的逻辑打印出相关信息,整体上实现了一个较为复杂的功能组合 。 不过代码中init_hook
函数里获取write_f
时存在错误,应该修正为write_f = dlsym(RTLD_NEXT, "write");
。
汇编实现:
- 直接通过汇编语言操作寄存器等底层硬件资源来实现协程切换。优点是可以精确控制切换过程,性能更高;缺点是编写难度大,对开发者汇编语言和计算机底层知识要求高
三者优缺点对比
实现方式 | 优点 | 缺点 |
---|---|---|
setjmp/longjmp | - 跨平台性较好,在不同操作系统的 C 环境中基本可用。 - 提供了一种底层的非局部跳转机制,能实现复杂控制流或异常处理。 | - 代码实现相对复杂,使用不当易导致难以追踪的问题。 - 缺乏对栈管理等底层细节的自动处理,需开发者手动保障协程栈空间的合理存在,容易出错。 - 破坏了正常函数调用栈的结构和执行逻辑,使代码的可读性和可维护性变差 。 |
ucontext | - 实现方式相对简洁,通过相关函数(如getcontext 、setcontext 等 )可方便地保存和恢复 CPU 寄存器、堆栈指针等状态。 - 适用于实现协程和轻量级任务调度,功能较为通用。 | - 跨平台性一般,不同操作系统对ucontext 相关函数的支持和实现细节可能存在差异,移植性受限。 - 虽然使用比setjmp/longjmp 方便,但仍需开发者对底层执行流切换有一定了解,存在误用风险。 |
汇编实现 | - 性能较高,能直接操作寄存器等硬件资源,减少不必要开销,实现高效的上下文切换。 - 可定制性强,能针对特定需求精细控制切换过程。 | - 不同体系结构(如 x86、ARM 等)汇编代码差异大,需为不同硬件平台单独编写和维护代码,跨平台能力弱。 - 对开发者要求高,需熟悉汇编语言和底层硬件知识,开发难度大,代码可读性和可维护性差。 |
4 汇编实现 - 寄存器讲解
- 基本概念
在 CPU 中,寄存器用于临时存储数据和指令执行的中间结果等。不同寄存器有不同用途,比如通用寄存器(如 x86 架构下的eax
、ebx
等)可用于算术运算、数据存储等。当进行协程切换时,需要把当前协程使用的寄存器值保存起来(store
操作),切换到另一个协程时再恢复这些寄存器的值(load
操作) 。
上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别 mov 到相对应的寄存器上。此时上下文完成切换。如下图所示:
首先来回顾一下 x86_64 寄存器的相关知识。汇编与寄存器相关知识还会在《协程的实 现之切换》继续深入探讨的。x86_64 的寄存器有 16 个 64 位寄存器,分别是:
%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15
-
%rax
作为函数返回值使用的。 -
%rsp
栈指针寄存器,指向栈顶 -
%rdi, %rsi, %rdx, %rcx, %r8, %r9
用作函数参数,依次对应第 1 参数,第 2 参数。。。%rbx, %rbp, %r12, %r13, %r14, %r15
用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改%r10, %r11
用作数据存储,就是使用前要先保存原值 -
代码解释
-
store 操作代码
mov eax (co_a.a);
mov ebx (co_a.b);
mov eax (co_a.a)
:将内存中co_a.a
位置的值加载到eax
寄存器中。
mov ebx (co_a.b)
:类似地,将co_a.b
位置的值加载到ebx
寄存器。
- load 操作代码
mov (co_b.a) eax;
mov (co_b.b) ebx;
mov (co_b.a) eax
:将eax
寄存器中的值存储到内存中co_b.a
的位置。这里是在恢复协程 b 的上下文,把之前保存(可能是从另一个协程切换过来时保存的)到寄存器的值,放回协程 b 对应的内存位置。
mov (co_b.b) ebx
:同理,将ebx
寄存器的值存储到co_b.b
位置。
-
寄存器作用
-
eax
寄存器:在上述代码中,它先用于临时存储协程 a 的某个数据(co_a.a
),在切换到协程 b 时,又用于把这个值恢复到协程 b 对应的位置(co_b.a
) 。它是数据传递和保存恢复的一个重要载体。 -
ebx
寄存器:和eax
类似,用于存储协程 a 的另一个数据(co_a.b
),并在切换时恢复到协程 b 的对应位置(co_b.b
) 。
-
-
完整的简单汇编代码示例(x86 架构,简化示意)
section.data
co_a:a dd 0b dd 0
co_b:a dd 0b dd 0section.text
global _start_start:
; store操作示例,保存协程a部分上下文mov eax, [co_a + 0] ; 假设co_a.a在偏移0处mov ebx, [co_a + 4] ; 假设co_a.b在偏移4处; 这里可以想象进行协程切换的逻辑; load操作示例,恢复协程b部分上下文mov [co_b + 0], eaxmov [co_b + 4], ebxmov eax, 1 ; 系统调用号,退出程序xor ebx, ebx ; 返回值0int 0x80 ; 触发系统调用
上述代码在数据段定义了两个类似协程结构体的变量co_a
和co_b
,在代码段简单演示了 store 和 load 操作过程。实际应用中,协程切换还需处理栈指针、标志寄存器等更多内容,并且要和高级语言代码配合使用。
5 协程初始启动 - eip 寄存器设置
以 NtyCo 的实现为例,来分析这个过程。CPU 有一个非常重要的寄存器叫做 EIP,用来 存储 CPU 运行下一条指令的地址。我们可以把回调函数的地址存储到 EIP 中,将相应的参数 存储到相应的参数寄存器中。实现子过程调用的逻辑代码如下:
void _exec(nty_coroutine *co) {co->func(co->arg); //子过程的回调函数 }void nty_coroutine_init(nty_coroutine *co) {//ctx 就是协程的上下文co->ctx.edi = (void*)co; //设置参数co->ctx.eip = (void*)_exec; //设置回调函数入口//当实现上下文切换的时候,就会执行入口函数_exec , _exec 调用子过程 func
}
EIP 寄存器的设置是协程切换的核心环节,通过以下步骤实现执行流的转移:
- 保存当前 EIP:将当前协程的指令指针存入上下文结构体。
- 恢复目标 EIP:从目标协程的上下文结构体中读取 EIP,并通过栈顶和
ret
指令间接设置到 CPU 的 EIP 寄存器中。 - 跳转执行:
ret
指令触发执行流跳转到目标 EIP 指向的指令,完成协程切换。
- 代码功能总结
代码主要实现了一个用于协程切换的汇编函数_switch
,运行在 x86 架构下 。其核心功能是保存当前协程的上下文(栈指针esp
、基址指针ebp
、指令指针eip
以及通用寄存器ebx
、esi
、edi
等的值 ),并恢复目标协程的上下文,从而实现协程之间的切换。
- 代码注释详解
切换_switch 函数定义:
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
参数 1:即将运行协程的上下文,寄存器列表
参数 2:正在运行协程的上下文,寄存器列表
#ifdef __i386__
__asm__ (
" .text \n"
" .p2align 2,,3 \n"
".globl _switch \n"
"_switch: \n"
"__switch: \n"
// 将当前栈指针 + 8 位置处的值(应该是指向当前协程上下文结构体的指针)加载到 %edx 寄存器
"movl 8(%esp), %edx # fs->%edx \n"
// 将当前栈指针 %esp 的值保存到当前协程上下文结构体的 0 偏移位置(用于保存当前协程的栈指针)
"movl %esp, 0(%edx) # save esp \n"
// 将基址指针 %ebp 的值保存到当前协程上下文结构体的 4 偏移位置(用于保存当前协程的基址指针)
"movl %ebp, 4(%edx) # save ebp \n"
// 将当前栈顶的值(应该是返回地址,即指令指针 %eip )加载到 %eax 寄存器
"movl (%esp), %eax # save eip \n"
// 将指令指针 %eip 的值保存到当前协程上下文结构体的 8 偏移位置
"movl %eax, 8(%edx) \n"
// 分别将通用寄存器 %ebx、%esi、%edi 的值保存到当前协程上下文结构体对应的偏移位置
"movl %ebx, 12(%edx) # save ebx,esi,edi \n"
"movl %esi, 16(%edx) \n"
"movl %edi, 20(%edx) \n"
// 将目标协程上下文结构体的指针(从栈上获取,当前栈指针 + 4 位置处的值 )加载到 %edx 寄存器
"movl 4(%esp), %edx # ts->%edx \n"
// 从目标协程上下文结构体中恢复通用寄存器 %edi、%esi、%ebx 的值
"movl 20(%edx), %edi # restore ebx,esi,edi \n"
"movl 16(%edx), %esi \n"
"movl 12(%edx), %ebx \n"
// 从目标协程上下文结构体中恢复栈指针 %esp 的值
"movl 0(%edx), %esp # restore esp \n"
// 从目标协程上下文结构体中恢复基址指针 %ebp 的值
"movl 4(%edx), %ebp # restore ebp \n"
// 从目标协程上下文结构体中恢复指令指针 %eip 的值到 %eax 寄存器
"movl 8(%edx), %eax # restore eip \n"
// 将恢复后的指令指针 %eip 的值设置到栈顶,为函数返回后跳转到目标协程的正确位置做准备
"movl %eax, (%esp) \n"
// 函数返回,此时程序执行流将切换到目标协程的上下文环境继续执行
"ret \n"
); #ifndef _USE_UCONTEXT
typedef struct _nty_cpu_ctx {void *esp; // 用于存储栈指针void *ebp; // 用于存储基址指针void *eip; // 用于存储指令指针void *edi; // 用于存储通用寄存器 %edi 的值void *esi; // 用于存储通用寄存器 %esi 的值void *ebx; // 用于存储通用寄存器 %ebx 的值void *r1; // 预留,可能用于扩展存储其他寄存器值或数据void *r2; // 预留,可能用于扩展存储其他寄存器值或数据void *r3; // 预留,可能用于扩展存储其他寄存器值或数据void *r4; // 预留,可能用于扩展存储其他寄存器值或数据void *r5; // 预留,可能用于扩展存储其他寄存器值或数据
} nty_cpu_ctx;
#endif
按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi保存第二 个参数的值,即保存 cur_ctx 的值。X86_64 每个寄存器是 64bit,8byte。
- Movq %rsp, 0(%rsi) 保存在栈指针到 cur_ctx 实例的 rsp 项
- Movq %rbp, 8(%rsi)
- Movq (%rsp), %rax #将栈顶地址里面的值存储到 rax 寄存器中。Ret 后出栈,执行栈顶 Movq %rbp, 8(%rsi) #后续的指令都是用来保存 CPU 的寄存器到 new_ctx 的每一项中
- Movq 8(%rdi), %rbp #将 new_ctx 的值
- Movq 16(%rdi), %rax #将指令指针 rip 的值存储到 rax 中
- Movq %rax, (%rsp) # 将存储的 rip 值的 rax 寄存器赋值给栈指针的地址的值。 Ret # 出栈,回到栈指针,执行 rip 指向的指令。
上下文环境的切换完成。
6 协程栈空间定义 - 独立栈与共享栈的做法
*问题:协程如何定义?* *调度器如何定义?*
先来一道设计题:
设计一个协程的运行体 R 与运行体调度器 S 的结构体
- 运行体 R:包含运行状态{就绪,睡眠,等待},运行体回调函数, 回调参数,栈指针,栈大小,当前运行体
- 调度器 S:包含执行集合{就绪,睡眠,等待}
这道设计题拆分两个个问题,一个运行体如何高效地在多种状态集合更换。调度器与运行体的功能界限。
运行体如何高效地在多种状态集合更换
- 新创建的协程,创建完成后,加入到就绪集合,等待调度器的调度;协程 在运行完成后,进行 IO 操作,此时 IO 并未准备好,进入等待状态集合;IO 准备就绪,协程开始运行,后续进行 sleep 操作,此时进入到睡眠状态集合。
- 就绪(ready),睡眠(sleep),等待(wait)集合该采用如何数据结构来存储?
- 就绪(ready)集合并不没有设置优先级的选型,所有在协程优先级一致,所以可以使用队列来存储就绪的协程,简称为就绪队列(ready_queue)。
- 睡眠(sleep)集合需要按照睡眠时长进行排序,采用红黑树来存储,简称睡眠树(sleep_tree)红黑树在工程实用为<key, value>, key 为睡眠时长,
- value 为对应的协程结点。
- 等待(wait)集合,其功能是在等待 IO 准备就绪,等待 IO 也是有时长的, 所以等待(wait)集合采用红黑树的来存储,简称等待树(wait_tree),此处借 鉴 nginx 的设计。
数据结构如下图所示:
Coroutine 就是协程的相应属性,status 表示协程的运行状态。sleep 与 wait 两颗红黑树,ready 使用的队列,比如某协程调用 sleep 函数,加入睡 眠树(sleep_tree),status |= S 即可。比如某协程在等待树(wait_tree) 中,而 IO 准备就绪放入 ready 队列中,只需要移出等待树(wait_tree),状态更改 status &= ~W 即可。有一个前提条件就是不管何种运行状态的协程, 都在就绪队列中,只是同时包含有其他的运行状态。
调度器与运行体的功能界限
-
每一协程都需要使用的而且可能会不同属性的,就是协程属性。每一协程都需要的而且数据一致的,就是调度器的属性。
- 比如栈大小的数值,每个协程都一样的后不做更改可以作为调度器的属性,如果每个协程大小不一致,则可 以作为协程的属性。
-
用来管理所有协程的属性,作为调度器的属性。
- 比如 epoll 用来管理每一 个协程对应的 IO,是需要作为调度器属性。
7 协程结构体定义
协程的核心结构体如下:
typedef struct _nty_coroutine {nty_cpu_ctx ctx;proc_coroutine func;void *arg;size_t stack_size;nty_coroutine_status status;nty_schedule *sched;uint64_t birth;uint64_t id;void *stack;RB_ENTRY(_nty_coroutine) sleep_node;RB_ENTRY(_nty_coroutine) wait_node;TAILQ_ENTRY(_nty_coroutine) ready_next;TAILQ_ENTRY(_nty_coroutine) defer_next;
} nty_coroutine;
按照前面几章的描述,定义一个协程结构体需要多少域,我们描述了每一 个协程有自己的上下文环境,
- 需要保存 CPU 的寄存器 ctx;
- 需要有子过程的回 调函数 func;
- 需要有子过程回调函数的参数 arg;
- 需要定义自己的栈空间stack;
- 需要有自己栈空间的大小 stack_size;
- 需要定义协程的创建时间birth;
- 需要定义协程当前的运行状态 status;
- 需要定当前运行状态的结点 (ready_next, wait_node, sleep_node);
- 需要定义协程 id;需要定义调度器的全局对象 sched。
协程结构体用于存储协程的相关信息,如协程的状态(运行、暂停、终止等)、协程的上下文(保存的寄存器值等)、协程的栈指针等。通过定义合理的协程结构体,方便对协程进行管理和操作 。
#include <stdio.h>
#include <stdlib.h>
#include <ucontext.h>// 协程ID类型定义
typedef int co_id;// 协程执行函数类型
typedef void *(*coroutine_entry)(void *);// 协程状态枚举
typedef enum {CO_READY, // 就绪状态CO_RUNNING, // 运行状态CO_SUSPENDED, // 暂停状态CO_TERMINATED // 终止状态
} co_status;// 协程结构体定义
struct coroutine {co_id id; // 协程IDucontext_t ctx; // 协程上下文coroutine_entry func; // 协程执行函数void *arg; // 传递给函数的参数void *result; // 函数返回结果co_status status; // 协程状态char *stack; // 协程栈指针size_t stack_size; // 协程栈大小int fd; // 文件描述符(用于IO操作)
};// 协程管理器结构
struct coroutine_manager {struct coroutine **coroutines; // 协程数组int capacity; // 协程池容量int count; // 当前协程数量co_id current; // 当前运行的协程IDucontext_t main_ctx; // 主上下文
};// 全局协程管理器
static struct coroutine_manager *co_manager = NULL;// 创建新协程
co_id create_coroutine(coroutine_entry entry, void *arg) {if (co_manager == NULL) {// 初始化协程管理器co_manager = (struct coroutine_manager *)malloc(sizeof(struct coroutine_manager));co_manager->capacity = 100;co_manager->count = 0;co_manager->current = -1;co_manager->coroutines = (struct coroutine **)calloc(co_manager->capacity, sizeof(struct coroutine *));}// 创建新协程struct coroutine *co = (struct coroutine *)malloc(sizeof(struct coroutine));co->id = co_manager->count++;co->func = entry;co->arg = arg;co->result = NULL;co->status = CO_READY;co->stack_size = 64 * 1024; // 64KB栈大小co->stack = (char *)malloc(co->stack_size);co->fd = -1;// 保存到协程管理器if (co_manager->count > co_manager->capacity) {// 扩容逻辑co_manager->capacity *= 2;co_manager->coroutines = (struct coroutine **)realloc(co_manager->coroutines, co_manager->capacity * sizeof(struct coroutine *));}co_manager->coroutines[co->id] = co;// 初始化上下文getcontext(&co->ctx);co->ctx.uc_stack.ss_sp = co->stack;co->ctx.uc_stack.ss_size = co->stack_size;co->ctx.uc_link = &co_manager->main_ctx;// 设置协程入口函数makecontext(&co->ctx, (void (*)(void))entry, 1, arg);return co->id;
}// 启动协程调度
void schedule() {if (co_manager == NULL || co_manager->count == 0) {return;}// 简单的轮询调度for (int i = 0; i < co_manager->count; i++) {co_id next = (co_manager->current + 1) % co_manager->count;struct coroutine *co = co_manager->coroutines[next];if (co->status == CO_READY || co->status == CO_SUSPENDED) {co_manager->current = next;co->status = CO_RUNNING;swapcontext(&co_manager->main_ctx, &co->ctx);}}
}// 让出CPU,暂停当前协程
void yield() {if (co_manager == NULL || co_manager->current == -1) {return;}struct coroutine *co = co_manager->coroutines[co_manager->current];co->status = CO_SUSPENDED;swapcontext(&co->ctx, &co_manager->main_ctx);
}// 协程示例函数
void *coroutine_function(void *arg) {int id = *(int *)arg;printf("Coroutine %d started\n", id);for (int i = 0; i < 5; i++) {printf("Coroutine %d running step %d\n", id, i);yield(); // 让出CPU}printf("Coroutine %d finished\n", id);return NULL;
}// 主函数示例
int main() {int args1 = 1, args2 = 2;// 创建两个协程co_id co1 = create_coroutine(coroutine_function, &args1);co_id co2 = create_coroutine(coroutine_function, &args2);// 启动调度schedule();// 清理资源for (int i = 0; i < co_manager->count; i++) {free(co_manager->coroutines[i]->stack);free(co_manager->coroutines[i]);}free(co_manager->coroutines);free(co_manager);return 0;
}
8. 调度器的定义(struct scheduler )
以下为调度器的定义:
typedef struct _nty_coroutine_queue nty_coroutine_queue;typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;typedef struct _nty_schedule { uint64_t birth;nty_cpu_ctx ctx;struct _nty_coroutine *curr_thread;int page_size;int poller_fd;int eventfd;struct epoll_event eventlist[NTY_CO_MAX_EVENTS];int nevents;int num_new_events;nty_coroutine_queue ready;nty_coroutine_rbtree_sleep sleeping;nty_coroutine_rbtree_wait waiting;} nty_schedule;
调度器的属性:
- 需要有保存 CPU 的寄存器上下文 ctx,
- 可以从协程运行状态yield 到调度器运行的sleep。
- 从协程到调度器用yield,
- 从调度器到协程用 resume
调度器管理协程,一般包含就绪协程队列(存放可执行协程 )、sleep 协程集合(存放处于睡眠等待特定时间的协程 )、运行时协程队列(记录当前正在运行协程 )、等待协程集合(存放等待特定事件,如 IO 事件的协程 ) 。示例代码(伪代码 ):
struct scheduler {queue_t ready_queue; // 就绪协程队列set_t sleep_set; // sleep协程集合struct coroutine* running; // 当前运行协程set_t wait_set; // 等待协程集合// 其他辅助成员,如epoll相关用于事件监听等int epfd;
};
9. 调度器的执行策略
调度器的实现,有两种方案,一种是生产者消费者模式,另一种多状态运行。
生产者消费者模式
while (1) {//遍历睡眠集合,将满足条件的加入到 ready
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) { TAILQ_ADD(&sched->ready, expired);
}//遍历等待集合,将满足添加的加入到 ready
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1); for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
TAILQ_ADD(&sched->ready, wait);
}// 使用 resume 回复 ready 的协程运行权
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
多状态运行
while (1) {//遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) { resume(expired);
}//遍历等待集合,使用 resume 恢复wait 的协程运行权
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1); for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}// 使用 resume 恢复 ready 的协程运行权
while (!TAILQ_EMPTY(sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
- 调度器定期检查
sleep集合
,查看是否有协程超时,将超时协程移到ready队列
。 - 通过事件机制(如 epoll )检查
wait集合
,当某事件(如某文件描述符可读 )发生时,将相应协程从wait集合
取出并恢复执行,移入ready队列
。 - 从
ready队列
取出协程,切换上下文使其运行。
10. 与 posix api 做到一致
主要是在协程实现过程中,对于涉及 IO 等操作时,尽量模仿或兼容 POSIX API 的行为和语义。比如在处理文件描述符相关操作时,协程中对读写操作的处理方式和返回值约定等与 POSIX 标准的read
、write
等函数保持相似,这样可让基于协程的代码能更好与现有的基于 POSIX API 的系统和库集成,减少适配成本。
11. 协程的执行流程
- 协程创建:分配协程上下文结构体并初始化,设置运行函数、参数等信息,将协程加入调度器管理。
- 运行:调度器选择一个就绪协程,切换到该协程上下文开始执行。遇到 IO 操作时,协程通过
yield
(让出执行权 )原语暂停执行,将自己放入等待集合,调度器切换到其他就绪协程执行。 - 恢复:当 IO 操作完成(通过事件通知,如 epoll 检测到文件描述符就绪 )或等待条件满足,调度器将对应协程从等待集合移到就绪队列,在合适时机恢复其执行,通过
resume
原语切换回该协程上下文继续运行。 - 结束:协程执行完函数逻辑或显式调用结束相关操作,从调度器中删除,释放协程上下文资源。
12. 协程的多核模式
- 多线程 + 协程:在多核 CPU 上,创建多个线程,每个线程内运行多个协程。线程作为 CPU 调度基本单位,利用多核并行能力,而协程在每个线程内进行更细粒度的任务调度,提升单线程内任务执行效率。**(因为多个线程会使用同一个调度器,所以需要考虑线程加锁的问题)**比如一个网络服务器程序,可开多个工作线程,每个线程内用协程处理多个客户端连接请求和数据交互。
- 亲和性设置:可设置协程或线程与特定 CPU 核心的亲和性,让某个线程或协程固定在某核心上运行,减少跨核心调度带来的缓存失效等开销,提高性能。
13. 协程的性能测试
- 基准测试:编写简单程序,如多个协程循环执行固定次数的简单计算任务,记录总执行时间,对比不同协程实现或不同参数配置下的性能差异。
- IO 密集型场景测试:模拟实际 IO 密集场景,如大量网络请求、文件读写操作。记录单位时间内协程处理的请求数量、响应延迟等指标。例如用协程实现 HTTP 服务器,测试其在高并发请求下的吞吐量和平均响应时间。
- 多核心性能测试:在多核 CPU 环境下,测试不同负载下协程的性能表现,观察是否能有效利用多核资源,如测量随着核数增加,协程任务处理速度的提升幅度是否接近线性。
2.3.2 协程调度器实现与性能测试
1 调度器的定义分析
协程调度器负责决定哪个协程在何时获得执行权,类似于操作系统的进程调度器。它要管理协程的生命周期,包括创建、调度、暂停、恢复和销毁等操作,合理分配 CPU 时间片给各个协程,以实现高效的并发执行 。
2 超时集合、就绪队列、I/O 等待集合的实现
- 超时集合:用于管理那些设置了超时时间的协程,调度器会定期检查这个集合,当某个协程超时时间到达时,进行相应处理,如唤醒协程执行超时处理逻辑 。
- 就绪队列:存放已经准备好可以执行的协程,调度器从就绪队列中选择协程来执行,通常会按照一定的调度策略(如先来先服务、优先级调度等)来挑选 。
- I/O 等待集合:保存因进行 I/O 操作(如网络请求、文件读写)而处于等待状态的协程,当对应的 I/O 操作完成时,将协程从该集合移到就绪队列,使其可被调度执行 。
3 协程调度的执行流程
调度器首先从就绪队列中选取一个协程,通过协程切换原语(如 switch
)将执行权交给该协程。协程执行过程中,若遇到 I/O 操作,就将自身加入 I/O 等待集合并主动让出执行权;若设置了超时,调度器会在超时集合中检查并处理。当 I/O 操作完成或超时时间到,协程会被重新放回就绪队列等待下次调度 。
4 协程接口实现、异步流程实现
- 协程接口实现:为开发者提供创建、启动、暂停、恢复等协程操作的接口,方便在程序中使用协程。这些接口要封装好底层的协程实现细节,让开发者能像调用普通函数一样使用协程 。
- 异步流程实现:利用协程实现异步操作流程,例如在进行数据库查询、网络请求等耗时操作时,通过协程切换让程序在等待响应时去执行其他任务,而不是阻塞等待,实现异步非阻塞的编程模型 。
5 hook 钩子的实现
Hook 钩子是一种在程序执行流程中的特定位置插入自定义代码的机制。在协程中,可在协程创建、切换、销毁等关键节点设置钩子函数,用于监控协程状态、进行性能统计、添加额外的逻辑处理等。例如,在协程切换时插入钩子函数,记录切换次数和时间,便于分析协程的执行效率 。
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <ucontext.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>ucontext_t ctx[3]; // 用于存储三个协程的上下文信息
ucontext_t main_ctx; // 用于存储主程序的上下文信息
int count = 0; // 计数器,用于控制协程循环执行的次数// hook相关类型定义
typedef ssize_t (*read_t)(int fd, void *buf, size_t count);
read_t read_f = NULL; // 用于存储原始的read函数指针typedef ssize_t (*write_t)(int fd, const void *buf, size_t count);
write_t write_f = NULL; // 用于存储原始的write函数指针// 重定义read函数,实现对read操作的hook
ssize_t read(int fd, void *buf, size_t count) {ssize_t ret = read_f(fd, buf, count); // 调用原始的read函数进行读取操作printf("read: %s\n", (char *)buf); // 打印读取到的内容return ret; // 返回读取的字节数
}// 重定义write函数,实现对write操作的hook
ssize_t write(int fd, const void *buf, size_t count) {printf("write: %s\n", (const char *)buf); // 打印要写入的内容return write_f(fd, buf, count); // 调用原始的write函数进行写入操作
}void init_hook(void) {if (!read_f) {// 通过dlsym获取下一个(原始的)read函数指针read_f = dlsym(RTLD_NEXT, "read"); }if (!write_f) {// 这里存在错误,应该获取write函数指针,却错误地获取read函数指针write_f = dlsym(RTLD_NEXT, "write"); }
}
ssize_t
类型的作用
ssize_t
是 C 语言中用于表示带符号的字节数的类型,通常用于系统调用的返回值:
- 正数:表示成功读取 / 写入的字节数
- 0:表示文件结束(EOF)
- -1:表示错误(通过
errno
获取具体错误码)
在 read/write
系统调用中,返回值可能为负(表示错误),因此使用有符号整数类型 ssize_t
而非无符号的 size_t
。
2. 函数指针的作用
这段代码中的函数指针(如 read_t
和 write_t
)用于存储原始系统函数的地址,以便在 hook 函数中调用它们。具体步骤:
- 保存原始函数地址:通过
dlsym(RTLD_NEXT, "read")
获取系统原本的read
函数地址 - 替换原始函数:定义同名的
read
函数,在其中调用保存的原始函数 - 添加额外逻辑:在调用前后添加自定义代码(如日志记录)
这是典型的函数钩子(Function Hook) 实现方式。
3. 回调函数在哪里?
在这段代码中,没有显式的回调函数。回调函数通常用于异步操作,而此处的 hook 是同步的。代码中的 read/write
函数直接调用原始系统函数并返回结果。
如果需要实现异步操作,可以在 hook 函数中注册回调(例如通过 aio_read
或 libevent
),但当前代码并未涉及这部分。
6 协程实现 mysql 请求
利用协程实现对 MySQL 数据库的异步请求。当发起 MySQL 请求时,协程不会阻塞等待结果返回,而是切换去执行其他任务。等 MySQL 数据库返回结果后,再通过调度器将对应协程恢复执行,处理返回的数据,提高程序处理数据库操作的并发能力 。
7 协程多核方案分析
在多核 CPU 环境下,要考虑如何充分利用多核资源来提高协程的执行效率。比如可将不同的协程分配到不同的 CPU 核心上执行,避免单核瓶颈;或者采用多核调度策略,让协程在多核间合理迁移,平衡负载,充分发挥多核 CPU 的性能优势 。
8 协程性能测试
通过各种性能测试工具和方法,对协程的性能进行评估。例如,测试协程的创建、切换开销,协程并发处理任务时的吞吐量、响应时间等指标,分析协程在不同负载下的性能表现,找出性能瓶颈并进行优化 。
「作业:测试协程框架的百万并发」
测试环境:4 台 VMWare 虚拟机 1 台服务器 6G 内存,4 核 CPU 3 台客户端 2G 内存,2 核 CPU 操作系统:ubuntu 14.04
服务器端测试代码:https://github.com/wangbojing/NtyCo 客户端测试代码:
https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport _epoll.c
按照每一个连接启动一个协程来测试。每一个协程栈空间 4096byte 6G 内存 -> 测试协程数量 100W 无异常。并且能够正常收发数据。
「协程如何写到简历中去」
例如:项目描述:图床产品用于存储和管理图片资源,为用户提供便捷的图片上传、存储及访问服务。项目中使用 ntyco 网络框架搭建 webserver,利用其协程特性实现高并发网络请求处理;基于 dkvstore 组件构建 kv 存储模块,用于存储图片元数据等信息;同时集成 redis 进行缓存加速,借助 mediahub 进行多媒体资源管理与分发。