当前位置: 首页 > ai >正文

【Linux高级全栈开发】2.3.1 协程设计原理与汇编实现2.3.2 协程调度器实现与性能测试

【Linux高级全栈开发】2.3.1 协程设计原理与汇编实现2.3.2 协程调度器实现与性能测试

一、协程基础概念与存在意义
  1. 协程定义

    • 轻量级用户态线程,通过主动调度实现单线程内多任务协作,兼具同步编程逻辑与异步性能。
  2. 核心价值

    • 解决同步 IO 阻塞问题,避免多线程高开销,适用于 IO 密集型场景(如网络请求、数据库操作)。
  3. 与线程对比

    特性协程线程
    创建成本微秒级(极低)毫秒级(较高)
    内存占用几 KB几 MB
    调度方式非抢占式(主动让出)抢占式(系统调度)
    适用场景IO 密集型CPU 密集型
二、协程实现原理与关键技术
  1. 核心原语操作
    • create:创建协程,分配栈空间并初始化上下文。
    • resume:恢复协程执行,切换至目标协程上下文。
    • yield:主动让出执行权,保存当前协程状态并切换至调度器。
    • switch:底层上下文切换,保存 / 恢复寄存器状态(如 EIP、ESP、通用寄存器)。
  2. 上下文切换实现方式
    • setjmp/longjmp:跨平台但破坏栈结构,适用于简单场景。
    • ucontext:系统级上下文管理,通过swapcontext实现切换,移植性受限。
    • 汇编实现:直接操作寄存器(如 x86 的 EIP、ESP),性能高但开发难度大。
  3. 寄存器操作关键逻辑
    • 保存当前状态:将ESP(栈指针)、EBP(基址指针)、EIP(指令指针)及通用寄存器值存入协程上下文结构体。
    • 恢复目标状态:从目标协程上下文中读取寄存器值,通过ret指令跳转至目标EIP
三、协程调度器设计与实现
  1. 调度器核心功能

    • 管理协程生命周期(就绪、睡眠、等待状态),通过事件驱动(如 epoll)处理 IO 就绪事件,触发协程恢复。
  2. 数据结构设计

    • 就绪队列(Ready Queue):存储可执行协程,采用双向链表实现快速入队 / 出队。
    • 睡眠树(Sleep Tree):基于红黑树管理超时协程,按剩余时间排序。
    • 等待树(Wait Tree):通过红黑树关联 IO 事件(如文件描述符)与协程,结合 epoll 检测事件就绪。
  3. 执行流程

    while (1) {// 处理超时协程expire_co = sleep_tree_check();if (expire_co) add_to_ready_queue(expire_co);// 监听IO事件nready = epoll_wait(epfd, events, MAX_EVENTS);for (i=0; i<nready; i++) {wait_co = wait_tree_search(events[i].fd);if (wait_co) add_to_ready_queue(wait_co);}// 调度就绪协程while ((ready_co = pop_from_ready_queue())) {resume(ready_co); // 切换至协程上下文执行}
    }
    
四、协程性能优化与测试
  1. 性能优化方向
    • 栈空间管理:采用固定大小栈(如 4KB)或动态扩展栈,减少内存碎片。
    • 多核支持:多线程 + 协程模式,每个线程独立维护调度器,通过 CPU 亲和性绑定核心。
    • Hook 机制:对系统调用(如read/write)进行钩子拦截,实现异步化改造。
  2. 性能测试案例
    • 百万并发测试:使用NtyCo框架在 4 核 6G 服务器上创建 100 万协程,单连接栈空间 4KB,无内存溢出且正常收发数据。
    • 对比指标:同步模式(6500ms/1000 连接) vs 异步协程模式(900ms/1000 连接),响应时间提升约 86%。
五、协程在项目中的应用与简历描述
  1. 典型应用场景
    • 高并发 Web 服务器(如 Nginx 协程扩展)、异步数据库访问(如 MySQL 请求非阻塞处理)、微服务架构中的异步通信。
  2. 简历撰写示例
    • 项目名称:高并发图床服务
    • 技术点:使用ntyco协程框架实现百万级连接处理,通过 epoll + 协程非阻塞模型提升 IO 效率,对比传统线程模型吞吐量提升 300%。
    • 职责:设计协程调度器,优化上下文切换性能;集成 Redis 缓存,通过协程异步预加载提升响应速度。
六、扩展与进阶
  1. 协程与 DPDK 结合:在用户态协议栈中使用协程处理网络包,减少内核态上下文切换开销。
  2. 语言层协程支持:如 Go 的 Goroutine、Python 的asyncio,底层均基于类似原理实现。

2.3.1 协程设计原理与汇编实现(9个问题)

1 为什么要有协程?协程存在的 3 个原因
  • 协程存在的原因解决的问题:如何把同步的 recv/send 改为异步的 recv/send

  • 以Linux 为例,在这里需要介绍一个“网红 ”就是 epoll。服务器使用 epoll 管理 百万计的客户端长连接,代码框架如下:

    while (1) {int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);for (i = 0;i < nready;i ++) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {int connfd = accept(listenfd, xxx, xxxx);setnonblock(connfd);ev.events = EPOLLIN | EPOLLET;ev.data.fd = connfd;epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);} else {handle(sockfd);}}
    }
    
    • 对于服务器处理百万计的 IO。Handle(sockfd)实现方式有两种。

      • *第一种*,handle(sockfd)函数内部对 sockfd 进行读写动作。handle 的 io 操作(send,recv)与 epoll_wait 是在同一个处理流程里面的。 这就是 IO 同步操作。每 1000 个连接接入的服务器响应时间(6500ms 左右)。
      • *第二种*,handle(sockfd)函数内部将 sockfd 的操作,push 到线程池中,Handle 函数是将 sockfd 处理方式放到另一个已经其他的线程中运行,如此做 法,将 io 操作(recv,send)与 epoll_wait 不在一个处理流程里面,使得 io 操作(recv,send)与 epoll_wait 实现解耦。这就叫做 IO 异步操作。IO 异步操作,每 1000 个连接接入的服务器响应时间(900ms 左右)。
    • 优点:

      1. 子模块好规划。
      2. 程序性能高。
    • 缺点:正因为子模块好规划,使得模块之间的 sockfd 的管理异常麻烦。每一个子线程 都需要管理好 sockfd,避免在 IO 操作的时候,sockfd 出现关闭或其他异常。

  • 有没有一种方式,有异步性能,同步的代码逻辑。来方便编程人员对 IO 操作的 组件呢? 有,采用一种轻量级的协程来实现。在每次 send 或者 recv 之前进行 切换,再由调度器来处理 epoll_wait 的流程。

  • 传统同步编程在遇到 IO 操作(如网络请求、文件读写 )时,线程会阻塞等待,期间 CPU 闲置,浪费资源。多线程虽可并发处理任务,但线程上下文切换开销大,且存在同步、互斥等复杂问题。协程作为轻量级用户态线程,能在单线程内实现多个任务的协作式调度,避免阻塞等待,以同步编程方式获得异步性能,适合 IO 密集型场景。

  • 同步与异步性能形容两者之间的关系,同步比异步要慢:同步编程模式下,程序按顺序执行,在某些操作(如 I/O 操作)等待时会阻塞,浪费 CPU 时间。异步编程可让程序在等待时去处理其他任务,提高效率。协程是异步编程的一种实现方式,能更好地平衡同步和异步的性能,避免不必要的等待和资源浪费 。

    • 同步执行示例:
    //dns_client_commit(argv[1]);
    int count = sizeof(domain) / sizeof(domain[0]);
    int i = 0;for (i = 0;i < count;i ++) {dns_client_commit(domain[i]);
    }
    
    • 异步执行示例:

      struct async_context *ctx = dns_async_client_init();
      if (ctx == NULL) return -2;
      int count = sizeof(domain) / sizeof(domain[0]);
      int i = 0;
      for (i = 0;i < count;i ++) {dns_async_client_commit(ctx, domain[i], dns_async_client_result_callback);
      //sleep(2);}
      getchar();
      
  • 服务端异步处理:在服务端,大量请求到来时,若采用传统同步处理,每个请求都要等待前面的处理完,会造成性能瓶颈。协程可使服务端在处理一个请求的耗时操作(如数据库查询)时,切换去处理其他请求,充分利用资源,提升服务端并发处理能力。

    • 把检测是否就绪的 epoll_wait与读写 iorecv/send不放到一个流程中,把检测到的io丢到线程池里去读写

      if (nRun) {printf(" New Data is Comming\n");client_data_process(clientfd);
      } else {//	epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);client_t *rClient = (client_t*)malloc(sizeof(client_t));memset(rClient, 0, sizeof(client_t));				rClient->fd = clientfd;job_t *job = malloc(sizeof(job_t));job->job_function = client_job;job->user_data = rClient;workqueue_add_job(&workqueue, job);
      }//993ms
      
      while(1){
      int nready = epoll_wait();
      for(i=0;i<nready;i ++){recv(event[i].fd,buffer);send(event[i].fd,buffer);
      }
      
    • 检测与读写io在一个流程中:

      client_data_process(clientfd); // 5600ms
      
      void *task_callback(void *arg{recv(event[il.fd, buffer);send(event[i].fd, buffer);
      }
      while(1){int nready = epoll_wait();for(i=0; i<nready; i ++){task.fd = event[i].fd;task.callback = task_callback;push_task_to_threadpool(&task);}
      }
      
  • 客户端异步请求:客户端发起网络请求等操作时,可能有数十个接口需要去请求,若用同步方式,请求会是一种串行,一个崩溃的话,后续界面可能会卡住。协程可实现异步请求,在等待响应时,请求是并行的,客户端界面仍可响应用户操作,提升用户体验 。

  • 异步请求应用场景速览

    1. DNS:非阻塞解析域名,避免单个查询阻塞整个服务(如 Node.js dns.promises)。
    2. HTTP:高并发请求(如爬虫、微服务调用),单线程处理数千连接(如 Python aiohttp、JavaScript fetch)。
    3. Redis:异步读写缓存 / 队列,提升吞吐量(如 Python redis.asyncio、Node.js ioredis)。
    4. Kafka:异步生产 / 消费消息,支持百万级 TPS(如 Python aiokafka、Java KafkaConsumer 异步 API)。
    5. MongoDB:异步数据库操作,减少 IO 等待(如 Python motor、Node.js mongoose 异步方法)。
    6. 抖音
      • 视频播放页:主视频流分片加载的同时,预加载推荐视频列表、加载评论区内容以及实时更新互动数据等。
      • 直播场景:视频流拉取的同时,通过长连接实时推送弹幕消息、实时刷新观众列表以及加载主播信息和商品橱窗等。
    7. 微信
      • 朋友圈浏览:图文内容瀑布流加载时,预加载好友头像及个人信息,同时同步点赞与评论数据。
      • 群聊场景:批量拉取历史消息的同时,实时更新在线成员状态,预加载图片或视频缩略图。
    8. 淘宝
      • 商品详情页:加载商品主图及详情图的同时,实时查询价格与库存,获取用户评价并加载推荐商品。
      • 购物车结算:多商品价格计算与库存锁定验证同时进行,查询收货地址与配送方式,计算支付优惠。
    9. 美团
      • 商家详情页:菜品图片与价格请求的同时,加载用户评价与评分,查询商家位置与配送范围。
      • 即时配送:实时更新骑手位置与订单状态轮询同时进行,规划地图路径并确认支付状态。
  • 异步的缺点:代码逻辑比较复杂,不符合人的思维逻辑,比如回调函数嵌套编写,很难理解

  • 协程的核心价值:(同步编程,异步性能)轻量级线程,由程序主动调度(非操作系统内核),适合 IO 密集型场景。通过同步风格编写异步代码(如 Python async/await、Go goroutine),避免回调地狱,显著提升并发效率。协程(Coroutine)是一种比线程更加轻量级的并发编程模型,它允许程序在执行过程中暂停和恢复,从而高效地处理异步操作。

  • 协程的核心原理

    协程的实现依赖于以下技术:

    1. 非抢占式调度:协程主动让出控制权(通过 yieldawait 等)。
    2. 状态保存:暂停时保存局部变量和执行位置。
    3. 事件循环:管理协程的调度和恢复(如 Python 的 asyncio 循环)。
  • 协程 vs 线程

    特性协程线程
    创建成本极低(微秒级)较高(毫秒级)
    内存占用几 KB几 MB
    调度方式非抢占式(主动让出)抢占式(系统调度)
    适用场景IO 密集型任务(如网络请求)CPU 密集型任务
2 协程实现过程及原语操作

协程如何使用?与线程使用有何区别?

每请求每线程的代码如下:

while(1) {socklen_t len = sizeof(struct sockaddr_in);int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);pthread_t thread_id;pthread_create(&thread_id, NULL, client_cb, &clientfd);}

协程参考代码如下:

while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}

线程的API思维来使用协程,函数调用的性能来测试协程。

NtyCo 封装出来了若干接口,一类是协程本身的,二类是 posix 的异步封装 协程 API:while

  1. 协程创建 int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

  2. 协程调度器的运行 void nty_schedule_run(void)

  3. POSIX 异步封装 API:

    int nty_socket(int domain, int type, int protocol)
    int nty_accept(int fd, struct sockaddr *addr, socklen_t *len) int nty_recv(int fd, void *buf, int length)
    int nty_send(int fd, const void *buf, int length)
    int nty_close(int fd)
    
协程的实现之工作流程

*3.1* *创建协程*

当我们需要异步调用的时候,我们会创建一个协程。比如 accept 返回一个新的sockfd,创建一个客户端处理的子过程。再比如需要监听多个端口的时候,创建一个server 的子过程,这样多个端口同时工作的,是符合微服务的架构的。

创建协程的时候,进行了如何的工作?创建 API 如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

  • 参数 1 :nty_coroutine **new_co,需要传入空的协程的对象,这个对象是由内部 创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。
  • 参数 2:proc_coroutine func,协程的子过程。当协程被调度的时候,就会执行该 函数。
  • 参数 3:void *arg,需要传入到新协程中的参数。

*3.2* *实现* *IO* *异步操作*

大部分的朋友会关心 IO 异步操作如何实现,在 send 与 recv 调用的时候,如何实现异 步操作的。

先来看一下一段代码:

while (1) {
int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);for (i = 0;i < nready;i ++) {int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
int connfd = accept(listenfd, xxx, xxxx);setnonblock(connfd);ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);} else {epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL); recv(sockfd, buffer, length, 0);//parser_proto(buffer, length);send(sockfd, buffer, length, 0);
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL); }
}
}

在进行 IO 操作(recv,send)之前,先执行了 epoll_ctl 的 del 操作,将相应的 sockfd 从 epfd 中删除掉,在执行完 IO 操作(recv,send)再进行 epoll_ctl 的 add 的动作。这段代码看起来 似乎好像没有什么作用。

如果是在多个上下文中,这样的做法就很有意义了。能够保证 sockfd 只在一个上下文中 能够操作 IO 的。不会出现在多个上下文同时对一个 IO 进行操作的。协程的 IO 异步操作正 式是采用此模式进行的。

把单一协程的工作与调度器的工作的划分清楚,先引入两个原语操作 resumeyield ,yield 就是让出运行,resume 就是恢复运行。调度器与协程的上下文切换如下图所示

在协程的上下文 IO 异步操作(nty_recv ,nty_send)函数,步骤如下:

  1. 将 sockfd 添加到 epoll 管理中。
  2. 进行上下文环境切换,由协程上下文 yield 到调度器的上下文。
  3. 调度器获取下一个协程上下文。Resume 新的协程 IO 异步操作的上下文切换的时序图如下:
协程的核心原语操作
  • create:创建一个协程:

    1. 调度器是否存在,不存在也创建。调度器作为全局的单例。将调度 器的实例存储在线程的私有空间 pthread_setspecific。

    2. 分配一个 coroutine 的内存空间,分别设置 coroutine 的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。

    3. 将新分配协程添加到就绪队列 ready_queue 中 实现代码如下:

      int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func,void *arg) {assert(pthread_once(&sched_key_once,nty_coroutine_sched_key_creator) == 0);nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) {nty_schedule_create(0);sched = nty_coroutine_get_sched();if (sched == NULL) {printf("Failed to create scheduler\n");return -1;}}nty_coroutine *co = calloc(1, sizeof(nty_coroutine));if (co == NULL) {printf("Failed to allocate memory for new coroutine\n"); return -2;}//int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);if (ret) {printf("Failed to allocate stack for new coroutine\n"); free(co);return -3; }co->sched = sched;co->stack_size = sched->stack_size;co->status = BIT(NTY_COROUTINE_STATUS_NEW); //co->id = sched->spawned_coroutines ++;co->func = func;co->fd = -1;co->events = 0;co->arg = arg;co->birth = nty_coroutine_usec_now();*new_co = co;TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
      
    4. yield: 让出 CPU。

      oid nty_coroutine_yield(nty_coroutine *co)

      参数:当前运行的协程实例

      调用后该函数不会立即返回,而是切换到最近执行 resume 的上下文。该函 数返回是在执行 resume 的时候,会有调度器统一选择 resume 的,然后再 次调用 yield 的。resume 与 yield 是两个可逆过程的原子操作。

    5. resume:恢复协程的运行权

      nty_coroutine_resume(nty_coroutine *co)

      参数:需要恢复运行的协程实例

      调用后该函数也不会立即返回,而是切换到运行协程实例的 yield 的位 置。返回是在等协程相应事务处理完成后,主动 yield 会返回到 resume 的地 方。

  • switch:用于协程间的切换,保存当前协程的执行状态(如寄存器的值、程序计数器等),并恢复目标协程的执行状态,让程序执行流从一个协程转移到另一个协程 。

  • resume:用于恢复一个挂起的协程的执行,将协程从暂停状态切换到运行状态,继续执行协程内后续的代码 。

  • yield:表示当前协程主动让出执行权,将控制权交给调度器或其他协程,自身进入暂停状态,待合适时机再被唤醒继续执行 。

3 协程切换的三种实现方式

原语操作相关

setjmp/longjmp
  • 提供低级非局部跳转,setjmp保存当前上下文,longjmp恢复并跳转。但使用时需注意保证协程栈空间已建立且未退出,且可能影响程序可维护性和可读性 。

    #include <stdio.h>
    #include <setjmp.h>jmp_buf env;  // 用于存储程序上下文环境的缓冲区// 测试函数,接收一个整数参数
    void func(int arg) {printf("Calling func with arg = %d\n", arg);  // 打印当前参数值longjmp(env, ++arg);  // 跳回到setjmp处,并将arg+1作为返回值
    }int main() {  // 程序入口点int ret = setjmp(env);  // 设置跳转点,首次执行返回0printf("Returned from longjmp with value: %d\n", ret);  // 打印返回值if (ret == 0) {printf("First call (ret == 0)\n");  // 首次调用分支func(ret);  // 调用func函数,触发longjmp} else if (ret == 1) {printf("Second call (ret == 1)\n");  // 第二次调用分支func(ret);  // 再次调用func函数} else if (ret == 2) {printf("Third call (ret == 2)\n");  // 第三次调用分支func(ret);  // 第三次调用func函数} else {printf("Final call (ret >= 3)\n");  // 最终调用分支printf("Exiting program.\n");  // 输出退出信息}return 0;  // 程序正常结束
    }
    
ucontext
  • 在类 Unix 系统(如 Linux )中用于实现协程和用户态线程,通过保存和恢复 CPU 寄存器、堆栈指针等状态,允许程序在不同执行流间切换。主要函数有swapcontext(ucontext_t *oucp, const ucontext_t *ucp) ,用于保存当前上下文到oucp,并切换到ucp上下文 。

    #include <ucontext.h>
    // 定义两个ucontext_t类型的变量,用于存储协程上下文
    ucontext_t ctx[2]; 
    // 定义主上下文变量,用于存储主程序的上下文
    ucontext_t main_ctx; 
    // 定义计数器,用于控制循环次数
    int count = 0; // 协程函数func1,无参数,返回值为void
    void func1(void) { // 当count小于20时循环执行while (count ++ < 20) { printf("1\n"); // 将当前协程(ctx[0])的上下文切换到另一个协程(ctx[1])的上下文swapcontext(&ctx[0], &ctx[1]); printf("3\n"); }
    }// 协程函数func2,无参数,返回值为void
    void func2(void) { char a[4096] = {0}; // 当count小于20时循环执行while (count ++ < 20) { printf("2\n"); // 将当前协程(ctx[1])的上下文切换到另一个协程(ctx[0])的上下文swapcontext(&ctx[1], &ctx[0]); printf("4\n"); }
    }int main() {char stack1[2048] = {0};char stack2[2048] = {0};// 获取当前上下文并保存到ctx[0]中getcontext(&ctx[0]); // 设置ctx[0]对应的栈空间起始地址ctx[0].uc_stack.ss_sp = stack1; // 设置ctx[0]对应的栈空间大小ctx[0].uc_stack.ss_size = sizeof(stack1); // 设置ctx[0]的链接上下文为主程序上下文main_ctxctx[0].uc_link = &main_ctx; // 创建一个新的上下文环境,指定执行函数为func1,无参数makecontext(&ctx[0], func1, 0); // 获取当前上下文并保存到ctx[1]中getcontext(&ctx[1]); // 设置ctx[1]对应的栈空间起始地址ctx[1].uc_stack.ss_sp = stack2; // 设置ctx[1]对应的栈空间大小ctx[1].uc_stack.ss_size = sizeof(stack2); // 设置ctx[1]的链接上下文为主程序上下文main_ctxctx[1].uc_link = &main_ctx; // 创建一个新的上下文环境,指定执行函数为func2,无参数makecontext(&ctx[1], func2, 0); printf("swapcontext\n");// 将主程序上下文切换到ctx[0]对应的协程上下文,开始执行func1swapcontext(&main_ctx, &ctx[0]); printf("\n");return 0;
    }
    

    这段 C 代码主要利用ucontext.h头文件提供的函数来实现简单的协程切换功能。ucontext_t类型用于存储程序的上下文信息,包括寄存器状态、栈指针等。通过getcontext获取当前上下文,makecontext设置上下文要执行的函数等信息,swapcontext实现上下文之间的切换。

    代码定义了两个协程函数func1func2,在main函数中设置好它们的上下文环境后,通过swapcontext进行切换,使得两个协程函数交替执行,在循环条件(count < 20)满足的情况下,打印出特定的字符序列,从而实现协程之间的协作式多任务处理 。

    输出为:1 2 3 1 4 2 3 1 4 2 3 。。。

    • 更复杂的调度器代码
    #define _GNU_SOURCE
    #include <dlfcn.h>
    #include <stdio.h>
    #include <ucontext.h>
    #include <string.h>
    #include <unistd.h>
    #include <fcntl.h>ucontext_t ctx[3]; // 用于存储三个协程的上下文信息
    ucontext_t main_ctx; // 用于存储主程序的上下文信息
    int count = 0; // 计数器,用于控制协程循环执行的次数// hook相关类型定义
    typedef ssize_t (*read_t)(int fd, void *buf, size_t count);
    read_t read_f = NULL; // 用于存储原始的read函数指针typedef ssize_t (*write_t)(int fd, const void *buf, size_t count);
    write_t write_f = NULL; // 用于存储原始的write函数指针// 重定义read函数,实现对read操作的hook
    ssize_t read(int fd, void *buf, size_t count) {ssize_t ret = read_f(fd, buf, count); // 调用原始的read函数进行读取操作printf("read: %s\n", (char *)buf); // 打印读取到的内容return ret; // 返回读取的字节数
    }// 重定义write函数,实现对write操作的hook
    ssize_t write(int fd, const void *buf, size_t count) {printf("write: %s\n", (const char *)buf); // 打印要写入的内容return write_f(fd, buf, count); // 调用原始的write函数进行写入操作
    }void init_hook(void) {if (!read_f) {// 通过dlsym获取下一个(原始的)read函数指针read_f = dlsym(RTLD_NEXT, "read"); }if (!write_f) {// 这里存在错误,应该获取write函数指针,却错误地获取read函数指针write_f = dlsym(RTLD_NEXT, "read"); }
    }// 协程函数1
    void func1(void) {while (count ++ < 30) { // 当count小于30时循环执行printf("1\n");// 将当前协程(ctx[0])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[0], &main_ctx); printf("4\n");}
    }// 协程函数2
    void func2(void) {while (count ++ < 30) {printf("2\n");// 将当前协程(ctx[1])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[1], &main_ctx); printf("5\n");}
    }// 协程函数3
    void func3(void) {while (count ++ < 30) {printf("3\n");// 将当前协程(ctx[2])的上下文切换到主程序上下文(main_ctx)swapcontext(&ctx[2], &main_ctx); printf("6\n");}
    }int main() {init_hook(); // 初始化函数hookint fd = open("a.txt", O_CREAT | O_RDWR); // 以读写模式创建文件a.txtif (fd < 0) {return -1; // 打开文件失败则返回-1}char *str = "1234567890";write(fd, str, strlen(str)); // 向文件中写入字符串char buffer[128] = {0};read(fd, buffer, 128); // 从文件中读取内容到bufferprintf("buffer: %s\n", buffer); // 打印读取到的内容#if 1char stack1[2048] = {0};char stack2[2048] = {0};char stack3[2048] = {0};// 获取并设置第一个协程的上下文getcontext(&ctx[0]);ctx[0].uc_stack.ss_sp = stack1;ctx[0].uc_stack.ss_size = sizeof(stack1);ctx[0].uc_link = &main_ctx;makecontext(&ctx[0], func1, 0);// 获取并设置第二个协程的上下文getcontext(&ctx[1]);ctx[1].uc_stack.ss_sp = stack2;ctx[1].uc_stack.ss_size = sizeof(stack2);ctx[1].uc_link = &main_ctx;makecontext(&ctx[1], func2, 0);// 获取并设置第三个协程的上下文getcontext(&ctx[2]);ctx[2].uc_stack.ss_sp = stack3;ctx[2].uc_stack.ss_size = sizeof(stack3);ctx[2].uc_link = &main_ctx;makecontext(&ctx[2], func3, 0);printf("swapcontext\n");while (count <= 30) { // 调度器循环// 切换到对应的协程上下文执行,实现协程轮流执行swapcontext(&main_ctx, &ctx[count%3]); }printf("\n");
    #endifreturn 0;
    }
    
    • 整体功能:
    • 这段代码综合了函数 hook 和协程调度的功能。通过函数 hook,重定义了readwrite函数,使得在进行文件读写操作时能够打印出读写的内容。同时,利用ucontext.h中的函数实现了三个协程(func1func2func3),每个协程在满足一定循环条件下打印特定字符并进行上下文切换。在main函数中,通过调度器逻辑(while (count <= 30)循环以及swapcontext调用)来轮流执行这三个协程,实现了协程间的协作式多任务处理。
    • 调度器的作用:
    • 调度器(在代码中就是main函数里的while (count <= 30)循环部分)的存在是为了控制多个协程的执行顺序和时机。协程本身只是一段代码逻辑,它们不会自动按照期望的方式交替执行。通过调度器,利用swapcontext函数,可以有规律地切换到不同的协程上下文,让各个协程轮流获得执行机会,从而实现多个任务(这里是三个协程函数的执行)之间的协作运行,避免某个协程一直占用执行资源,提高程序的并发处理能力和资源利用率 。 此外,调度器还可以根据特定的条件(如这里的count值)来决定协程的执行次数和何时终止等逻辑 。 同时,结合前面的函数 hook 功能,使得在协程执行过程中涉及文件读写操作时,能够按照 hook 的逻辑打印出相关信息,整体上实现了一个较为复杂的功能组合 。 不过代码中init_hook函数里获取write_f时存在错误,应该修正为write_f = dlsym(RTLD_NEXT, "write");
汇编实现
  • 直接通过汇编语言操作寄存器等底层硬件资源来实现协程切换。优点是可以精确控制切换过程,性能更高;缺点是编写难度大,对开发者汇编语言和计算机底层知识要求高
三者优缺点对比
实现方式优点缺点
setjmp/longjmp- 跨平台性较好,在不同操作系统的 C 环境中基本可用。 - 提供了一种底层的非局部跳转机制,能实现复杂控制流或异常处理。- 代码实现相对复杂,使用不当易导致难以追踪的问题。 - 缺乏对栈管理等底层细节的自动处理,需开发者手动保障协程栈空间的合理存在,容易出错。 - 破坏了正常函数调用栈的结构和执行逻辑,使代码的可读性和可维护性变差 。
ucontext- 实现方式相对简洁,通过相关函数(如getcontextsetcontext等 )可方便地保存和恢复 CPU 寄存器、堆栈指针等状态。 - 适用于实现协程和轻量级任务调度,功能较为通用。- 跨平台性一般,不同操作系统对ucontext相关函数的支持和实现细节可能存在差异,移植性受限。 - 虽然使用比setjmp/longjmp方便,但仍需开发者对底层执行流切换有一定了解,存在误用风险。
汇编实现- 性能较高,能直接操作寄存器等硬件资源,减少不必要开销,实现高效的上下文切换。 - 可定制性强,能针对特定需求精细控制切换过程。- 不同体系结构(如 x86、ARM 等)汇编代码差异大,需为不同硬件平台单独编写和维护代码,跨平台能力弱。 - 对开发者要求高,需熟悉汇编语言和底层硬件知识,开发难度大,代码可读性和可维护性差。
4 汇编实现 - 寄存器讲解
  • 基本概念

在 CPU 中,寄存器用于临时存储数据和指令执行的中间结果等。不同寄存器有不同用途,比如通用寄存器(如 x86 架构下的eaxebx等)可用于算术运算、数据存储等。当进行协程切换时,需要把当前协程使用的寄存器值保存起来(store操作),切换到另一个协程时再恢复这些寄存器的值(load操作) 。

上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别 mov 到相对应的寄存器上。此时上下文完成切换。如下图所示:在这里插入图片描述

首先来回顾一下 x86_64 寄存器的相关知识。汇编与寄存器相关知识还会在《协程的实 现之切换》继续深入探讨的。x86_64 的寄存器有 16 个 64 位寄存器,分别是:

%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15

  • %rax 作为函数返回值使用的。

  • %rsp 栈指针寄存器,指向栈顶

  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函数参数,依次对应第 1 参数,第 2 参数。。。 %rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改 %r10, %r11 用作数据存储,就是使用前要先保存原值

  • 代码解释

  • store 操作代码

mov eax (co_a.a);
mov ebx (co_a.b);

mov eax (co_a.a):将内存中co_a.a位置的值加载到eax寄存器中。
mov ebx (co_a.b):类似地,将co_a.b位置的值加载到ebx寄存器。

  • load 操作代码
mov (co_b.a) eax;
mov (co_b.b) ebx;

mov (co_b.a) eax:将eax寄存器中的值存储到内存中co_b.a的位置。这里是在恢复协程 b 的上下文,把之前保存(可能是从另一个协程切换过来时保存的)到寄存器的值,放回协程 b 对应的内存位置。
mov (co_b.b) ebx:同理,将ebx寄存器的值存储到co_b.b位置。

  • 寄存器作用

    • eax寄存器:在上述代码中,它先用于临时存储协程 a 的某个数据(co_a.a),在切换到协程 b 时,又用于把这个值恢复到协程 b 对应的位置(co_b.a) 。它是数据传递和保存恢复的一个重要载体。

    • ebx寄存器:和eax类似,用于存储协程 a 的另一个数据(co_a.b),并在切换时恢复到协程 b 的对应位置(co_b.b) 。

  • 完整的简单汇编代码示例(x86 架构,简化示意)

section.data
co_a:a dd 0b dd 0
co_b:a dd 0b dd 0section.text
global _start_start:
; store操作示例,保存协程a部分上下文mov eax, [co_a + 0] ; 假设co_a.a在偏移0处mov ebx, [co_a + 4] ; 假设co_a.b在偏移4处; 这里可以想象进行协程切换的逻辑; load操作示例,恢复协程b部分上下文mov [co_b + 0], eaxmov [co_b + 4], ebxmov eax, 1      ; 系统调用号,退出程序xor ebx, ebx    ; 返回值0int 0x80        ; 触发系统调用

上述代码在数据段定义了两个类似协程结构体的变量co_aco_b,在代码段简单演示了 store 和 load 操作过程。实际应用中,协程切换还需处理栈指针、标志寄存器等更多内容,并且要和高级语言代码配合使用。

5 协程初始启动 - eip 寄存器设置

以 NtyCo 的实现为例,来分析这个过程。CPU 有一个非常重要的寄存器叫做 EIP,用来 存储 CPU 运行下一条指令的地址。我们可以把回调函数的地址存储到 EIP 中,将相应的参数 存储到相应的参数寄存器中。实现子过程调用的逻辑代码如下:

void _exec(nty_coroutine *co) {co->func(co->arg); //子过程的回调函数 }void nty_coroutine_init(nty_coroutine *co) {//ctx 就是协程的上下文co->ctx.edi = (void*)co; //设置参数co->ctx.eip = (void*)_exec; //设置回调函数入口//当实现上下文切换的时候,就会执行入口函数_exec , _exec 调用子过程 func
}

EIP 寄存器的设置是协程切换的核心环节,通过以下步骤实现执行流的转移:

  1. 保存当前 EIP:将当前协程的指令指针存入上下文结构体。
  2. 恢复目标 EIP:从目标协程的上下文结构体中读取 EIP,并通过栈顶和 ret 指令间接设置到 CPU 的 EIP 寄存器中。
  3. 跳转执行ret 指令触发执行流跳转到目标 EIP 指向的指令,完成协程切换。
  • 代码功能总结

代码主要实现了一个用于协程切换的汇编函数_switch,运行在 x86 架构下 。其核心功能是保存当前协程的上下文(栈指针esp、基址指针ebp、指令指针eip以及通用寄存器ebxesiedi等的值 ),并恢复目标协程的上下文,从而实现协程之间的切换。

  • 代码注释详解

切换_switch 函数定义:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

参数 1:即将运行协程的上下文,寄存器列表

参数 2:正在运行协程的上下文,寄存器列表

#ifdef __i386__
__asm__ (
"    .text                                  \n"
"    .p2align 2,,3                          \n"
".globl _switch                             \n"
"_switch:                                   \n"
"__switch:                                  \n"
// 将当前栈指针 + 8 位置处的值(应该是指向当前协程上下文结构体的指针)加载到 %edx 寄存器
"movl 8(%esp), %edx      # fs->%edx         \n" 
// 将当前栈指针 %esp 的值保存到当前协程上下文结构体的 0 偏移位置(用于保存当前协程的栈指针)
"movl %esp, 0(%edx)      # save esp         \n" 
// 将基址指针 %ebp 的值保存到当前协程上下文结构体的 4 偏移位置(用于保存当前协程的基址指针)
"movl %ebp, 4(%edx)      # save ebp         \n" 
// 将当前栈顶的值(应该是返回地址,即指令指针 %eip )加载到 %eax 寄存器
"movl (%esp), %eax       # save eip         \n" 
// 将指令指针 %eip 的值保存到当前协程上下文结构体的 8 偏移位置
"movl %eax, 8(%edx)                         \n" 
// 分别将通用寄存器 %ebx、%esi、%edi 的值保存到当前协程上下文结构体对应的偏移位置
"movl %ebx, 12(%edx)     # save ebx,esi,edi \n" 
"movl %esi, 16(%edx)                        \n" 
"movl %edi, 20(%edx)                        \n" 
// 将目标协程上下文结构体的指针(从栈上获取,当前栈指针 + 4 位置处的值 )加载到 %edx 寄存器
"movl 4(%esp), %edx      # ts->%edx         \n" 
// 从目标协程上下文结构体中恢复通用寄存器 %edi、%esi、%ebx 的值
"movl 20(%edx), %edi     # restore ebx,esi,edi      \n" 
"movl 16(%edx), %esi                                \n" 
"movl 12(%edx), %ebx                                \n" 
// 从目标协程上下文结构体中恢复栈指针 %esp 的值
"movl 0(%edx), %esp      # restore esp              \n" 
// 从目标协程上下文结构体中恢复基址指针 %ebp 的值
"movl 4(%edx), %ebp      # restore ebp              \n" 
// 从目标协程上下文结构体中恢复指令指针 %eip 的值到 %eax 寄存器
"movl 8(%edx), %eax      # restore eip              \n" 
// 将恢复后的指令指针 %eip 的值设置到栈顶,为函数返回后跳转到目标协程的正确位置做准备
"movl %eax, (%esp)                                  \n" 
// 函数返回,此时程序执行流将切换到目标协程的上下文环境继续执行
"ret                                                \n"
);      #ifndef _USE_UCONTEXT
typedef struct _nty_cpu_ctx {void *esp; // 用于存储栈指针void *ebp; // 用于存储基址指针void *eip; // 用于存储指令指针void *edi; // 用于存储通用寄存器 %edi 的值void *esi; // 用于存储通用寄存器 %esi 的值void *ebx; // 用于存储通用寄存器 %ebx 的值void *r1;  // 预留,可能用于扩展存储其他寄存器值或数据void *r2;  // 预留,可能用于扩展存储其他寄存器值或数据void *r3;  // 预留,可能用于扩展存储其他寄存器值或数据void *r4;  // 预留,可能用于扩展存储其他寄存器值或数据void *r5;  // 预留,可能用于扩展存储其他寄存器值或数据
} nty_cpu_ctx;
#endif

按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi保存第二 个参数的值,即保存 cur_ctx 的值。X86_64 每个寄存器是 64bit,8byte。

  • Movq %rsp, 0(%rsi) 保存在栈指针到 cur_ctx 实例的 rsp 项
  • Movq %rbp, 8(%rsi)
  • Movq (%rsp), %rax #将栈顶地址里面的值存储到 rax 寄存器中。Ret 后出栈,执行栈顶 Movq %rbp, 8(%rsi) #后续的指令都是用来保存 CPU 的寄存器到 new_ctx 的每一项中
  • Movq 8(%rdi), %rbp #将 new_ctx 的值
  • Movq 16(%rdi), %rax #将指令指针 rip 的值存储到 rax 中
  • Movq %rax, (%rsp) # 将存储的 rip 值的 rax 寄存器赋值给栈指针的地址的值。 Ret # 出栈,回到栈指针,执行 rip 指向的指令。

上下文环境的切换完成。

6 协程栈空间定义 - 独立栈与共享栈的做法

*问题:协程如何定义?* *调度器如何定义?*

先来一道设计题:

设计一个协程的运行体 R 与运行体调度器 S 的结构体

  1. 运行体 R:包含运行状态{就绪,睡眠,等待},运行体回调函数, 回调参数,栈指针,栈大小,当前运行体
  2. 调度器 S:包含执行集合{就绪,睡眠,等待}

这道设计题拆分两个个问题,一个运行体如何高效地在多种状态集合更换。调度器与运行体的功能界限。

运行体如何高效地在多种状态集合更换
  • 新创建的协程,创建完成后,加入到就绪集合,等待调度器的调度;协程 在运行完成后,进行 IO 操作,此时 IO 并未准备好,进入等待状态集合;IO 准备就绪,协程开始运行,后续进行 sleep 操作,此时进入到睡眠状态集合。
  • 就绪(ready),睡眠(sleep),等待(wait)集合该采用如何数据结构来存储?
  • 就绪(ready)集合并不没有设置优先级的选型,所有在协程优先级一致,所以可以使用队列来存储就绪的协程,简称为就绪队列(ready_queue)。
  • 睡眠(sleep)集合需要按照睡眠时长进行排序,采用红黑树来存储,简称睡眠树(sleep_tree)红黑树在工程实用为<key, value>, key 为睡眠时长,
  • value 为对应的协程结点。
  • 等待(wait)集合,其功能是在等待 IO 准备就绪,等待 IO 也是有时长的, 所以等待(wait)集合采用红黑树的来存储,简称等待树(wait_tree),此处借 鉴 nginx 的设计。

数据结构如下图所示:

在这里插入图片描述

Coroutine 就是协程的相应属性,status 表示协程的运行状态。sleep 与 wait 两颗红黑树,ready 使用的队列,比如某协程调用 sleep 函数,加入睡 眠树(sleep_tree),status |= S 即可。比如某协程在等待树(wait_tree) 中,而 IO 准备就绪放入 ready 队列中,只需要移出等待树(wait_tree),状态更改 status &= ~W 即可。有一个前提条件就是不管何种运行状态的协程, 都在就绪队列中,只是同时包含有其他的运行状态。

调度器与运行体的功能界限
  • 每一协程都需要使用的而且可能会不同属性的,就是协程属性。每一协程都需要的而且数据一致的,就是调度器的属性。

    • 比如栈大小的数值,每个协程都一样的后不做更改可以作为调度器的属性,如果每个协程大小不一致,则可 以作为协程的属性。
  • 用来管理所有协程的属性,作为调度器的属性。

    • 比如 epoll 用来管理每一 个协程对应的 IO,是需要作为调度器属性。
7 协程结构体定义

协程的核心结构体如下:

typedef struct _nty_coroutine {nty_cpu_ctx ctx;proc_coroutine func;void *arg;size_t stack_size;nty_coroutine_status status;nty_schedule *sched;uint64_t birth;uint64_t id;void *stack;RB_ENTRY(_nty_coroutine) sleep_node;RB_ENTRY(_nty_coroutine) wait_node;TAILQ_ENTRY(_nty_coroutine) ready_next;TAILQ_ENTRY(_nty_coroutine) defer_next;
} nty_coroutine;

按照前面几章的描述,定义一个协程结构体需要多少域,我们描述了每一 个协程有自己的上下文环境,

  • 需要保存 CPU 的寄存器 ctx;
  • 需要有子过程的回 调函数 func;
  • 需要有子过程回调函数的参数 arg;
  • 需要定义自己的栈空间stack;
  • 需要有自己栈空间的大小 stack_size;
  • 需要定义协程的创建时间birth;
  • 需要定义协程当前的运行状态 status;
  • 需要定当前运行状态的结点 (ready_next, wait_node, sleep_node);
  • 需要定义协程 id;需要定义调度器的全局对象 sched。

协程结构体用于存储协程的相关信息,如协程的状态(运行、暂停、终止等)、协程的上下文(保存的寄存器值等)、协程的栈指针等。通过定义合理的协程结构体,方便对协程进行管理和操作 。

#include <stdio.h>
#include <stdlib.h>
#include <ucontext.h>// 协程ID类型定义
typedef int co_id;// 协程执行函数类型
typedef void *(*coroutine_entry)(void *);// 协程状态枚举
typedef enum {CO_READY,     // 就绪状态CO_RUNNING,   // 运行状态CO_SUSPENDED, // 暂停状态CO_TERMINATED // 终止状态
} co_status;// 协程结构体定义
struct coroutine {co_id id;                // 协程IDucontext_t ctx;          // 协程上下文coroutine_entry func;    // 协程执行函数void *arg;               // 传递给函数的参数void *result;            // 函数返回结果co_status status;        // 协程状态char *stack;             // 协程栈指针size_t stack_size;       // 协程栈大小int fd;                  // 文件描述符(用于IO操作)
};// 协程管理器结构
struct coroutine_manager {struct coroutine **coroutines; // 协程数组int capacity;                  // 协程池容量int count;                     // 当前协程数量co_id current;                 // 当前运行的协程IDucontext_t main_ctx;           // 主上下文
};// 全局协程管理器
static struct coroutine_manager *co_manager = NULL;// 创建新协程
co_id create_coroutine(coroutine_entry entry, void *arg) {if (co_manager == NULL) {// 初始化协程管理器co_manager = (struct coroutine_manager *)malloc(sizeof(struct coroutine_manager));co_manager->capacity = 100;co_manager->count = 0;co_manager->current = -1;co_manager->coroutines = (struct coroutine **)calloc(co_manager->capacity, sizeof(struct coroutine *));}// 创建新协程struct coroutine *co = (struct coroutine *)malloc(sizeof(struct coroutine));co->id = co_manager->count++;co->func = entry;co->arg = arg;co->result = NULL;co->status = CO_READY;co->stack_size = 64 * 1024; // 64KB栈大小co->stack = (char *)malloc(co->stack_size);co->fd = -1;// 保存到协程管理器if (co_manager->count > co_manager->capacity) {// 扩容逻辑co_manager->capacity *= 2;co_manager->coroutines = (struct coroutine **)realloc(co_manager->coroutines, co_manager->capacity * sizeof(struct coroutine *));}co_manager->coroutines[co->id] = co;// 初始化上下文getcontext(&co->ctx);co->ctx.uc_stack.ss_sp = co->stack;co->ctx.uc_stack.ss_size = co->stack_size;co->ctx.uc_link = &co_manager->main_ctx;// 设置协程入口函数makecontext(&co->ctx, (void (*)(void))entry, 1, arg);return co->id;
}// 启动协程调度
void schedule() {if (co_manager == NULL || co_manager->count == 0) {return;}// 简单的轮询调度for (int i = 0; i < co_manager->count; i++) {co_id next = (co_manager->current + 1) % co_manager->count;struct coroutine *co = co_manager->coroutines[next];if (co->status == CO_READY || co->status == CO_SUSPENDED) {co_manager->current = next;co->status = CO_RUNNING;swapcontext(&co_manager->main_ctx, &co->ctx);}}
}// 让出CPU,暂停当前协程
void yield() {if (co_manager == NULL || co_manager->current == -1) {return;}struct coroutine *co = co_manager->coroutines[co_manager->current];co->status = CO_SUSPENDED;swapcontext(&co->ctx, &co_manager->main_ctx);
}// 协程示例函数
void *coroutine_function(void *arg) {int id = *(int *)arg;printf("Coroutine %d started\n", id);for (int i = 0; i < 5; i++) {printf("Coroutine %d running step %d\n", id, i);yield(); // 让出CPU}printf("Coroutine %d finished\n", id);return NULL;
}// 主函数示例
int main() {int args1 = 1, args2 = 2;// 创建两个协程co_id co1 = create_coroutine(coroutine_function, &args1);co_id co2 = create_coroutine(coroutine_function, &args2);// 启动调度schedule();// 清理资源for (int i = 0; i < co_manager->count; i++) {free(co_manager->coroutines[i]->stack);free(co_manager->coroutines[i]);}free(co_manager->coroutines);free(co_manager);return 0;
}
8. 调度器的定义(struct scheduler )

以下为调度器的定义:

typedef struct _nty_coroutine_queue nty_coroutine_queue;typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;typedef struct _nty_schedule { uint64_t birth;nty_cpu_ctx ctx;struct _nty_coroutine *curr_thread;int page_size;int poller_fd;int eventfd;struct epoll_event eventlist[NTY_CO_MAX_EVENTS];int nevents;int num_new_events;nty_coroutine_queue ready;nty_coroutine_rbtree_sleep sleeping;nty_coroutine_rbtree_wait waiting;} nty_schedule;

调度器的属性:

  • 需要有保存 CPU 的寄存器上下文 ctx,
  • 可以从协程运行状态yield 到调度器运行的sleep。
  • 从协程到调度器用yield,
  • 从调度器到协程用 resume

调度器管理协程,一般包含就绪协程队列(存放可执行协程 )、sleep 协程集合(存放处于睡眠等待特定时间的协程 )、运行时协程队列(记录当前正在运行协程 )、等待协程集合(存放等待特定事件,如 IO 事件的协程 ) 。示例代码(伪代码 ):

struct scheduler {queue_t ready_queue;  // 就绪协程队列set_t sleep_set;  // sleep协程集合struct coroutine* running;  // 当前运行协程set_t wait_set;  // 等待协程集合// 其他辅助成员,如epoll相关用于事件监听等int epfd; 
};
9. 调度器的执行策略

调度器的实现,有两种方案,一种是生产者消费者模式,另一种多状态运行。

生产者消费者模式

在这里插入图片描述

while (1) {//遍历睡眠集合,将满足条件的加入到 ready
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) { TAILQ_ADD(&sched->ready, expired);
}//遍历等待集合,将满足添加的加入到 ready
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1); for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
TAILQ_ADD(&sched->ready, wait);
}// 使用 resume 回复 ready 的协程运行权
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready); 
}
}
多状态运行

在这里插入图片描述

while (1) {//遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != ) { resume(expired);
}//遍历等待集合,使用 resume 恢复wait 的协程运行权
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1); for (i = 0;i < nready;i ++) {
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}// 使用 resume 恢复 ready 的协程运行权
while (!TAILQ_EMPTY(sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
  • 调度器定期检查sleep集合,查看是否有协程超时,将超时协程移到ready队列
  • 通过事件机制(如 epoll )检查wait集合,当某事件(如某文件描述符可读 )发生时,将相应协程从wait集合取出并恢复执行,移入ready队列
  • ready队列取出协程,切换上下文使其运行。
10. 与 posix api 做到一致

主要是在协程实现过程中,对于涉及 IO 等操作时,尽量模仿或兼容 POSIX API 的行为和语义。比如在处理文件描述符相关操作时,协程中对读写操作的处理方式和返回值约定等与 POSIX 标准的readwrite等函数保持相似,这样可让基于协程的代码能更好与现有的基于 POSIX API 的系统和库集成,减少适配成本。

11. 协程的执行流程
  • 协程创建:分配协程上下文结构体并初始化,设置运行函数、参数等信息,将协程加入调度器管理。
  • 运行:调度器选择一个就绪协程,切换到该协程上下文开始执行。遇到 IO 操作时,协程通过yield(让出执行权 )原语暂停执行,将自己放入等待集合,调度器切换到其他就绪协程执行。
  • 恢复:当 IO 操作完成(通过事件通知,如 epoll 检测到文件描述符就绪 )或等待条件满足,调度器将对应协程从等待集合移到就绪队列,在合适时机恢复其执行,通过resume原语切换回该协程上下文继续运行。
  • 结束:协程执行完函数逻辑或显式调用结束相关操作,从调度器中删除,释放协程上下文资源。
12. 协程的多核模式
  • 多线程 + 协程:在多核 CPU 上,创建多个线程,每个线程内运行多个协程。线程作为 CPU 调度基本单位,利用多核并行能力,而协程在每个线程内进行更细粒度的任务调度,提升单线程内任务执行效率。**(因为多个线程会使用同一个调度器,所以需要考虑线程加锁的问题)**比如一个网络服务器程序,可开多个工作线程,每个线程内用协程处理多个客户端连接请求和数据交互。
  • 亲和性设置:可设置协程或线程与特定 CPU 核心的亲和性,让某个线程或协程固定在某核心上运行,减少跨核心调度带来的缓存失效等开销,提高性能。
13. 协程的性能测试
  • 基准测试:编写简单程序,如多个协程循环执行固定次数的简单计算任务,记录总执行时间,对比不同协程实现或不同参数配置下的性能差异。
  • IO 密集型场景测试:模拟实际 IO 密集场景,如大量网络请求、文件读写操作。记录单位时间内协程处理的请求数量、响应延迟等指标。例如用协程实现 HTTP 服务器,测试其在高并发请求下的吞吐量和平均响应时间。
  • 多核心性能测试:在多核 CPU 环境下,测试不同负载下协程的性能表现,观察是否能有效利用多核资源,如测量随着核数增加,协程任务处理速度的提升幅度是否接近线性。

2.3.2 协程调度器实现与性能测试

1 调度器的定义分析

协程调度器负责决定哪个协程在何时获得执行权,类似于操作系统的进程调度器。它要管理协程的生命周期,包括创建、调度、暂停、恢复和销毁等操作,合理分配 CPU 时间片给各个协程,以实现高效的并发执行 。

2 超时集合、就绪队列、I/O 等待集合的实现
  • 超时集合:用于管理那些设置了超时时间的协程,调度器会定期检查这个集合,当某个协程超时时间到达时,进行相应处理,如唤醒协程执行超时处理逻辑 。
  • 就绪队列:存放已经准备好可以执行的协程,调度器从就绪队列中选择协程来执行,通常会按照一定的调度策略(如先来先服务、优先级调度等)来挑选 。
  • I/O 等待集合:保存因进行 I/O 操作(如网络请求、文件读写)而处于等待状态的协程,当对应的 I/O 操作完成时,将协程从该集合移到就绪队列,使其可被调度执行 。
3 协程调度的执行流程

调度器首先从就绪队列中选取一个协程,通过协程切换原语(如 switch )将执行权交给该协程。协程执行过程中,若遇到 I/O 操作,就将自身加入 I/O 等待集合并主动让出执行权;若设置了超时,调度器会在超时集合中检查并处理。当 I/O 操作完成或超时时间到,协程会被重新放回就绪队列等待下次调度 。

4 协程接口实现、异步流程实现
  • 协程接口实现:为开发者提供创建、启动、暂停、恢复等协程操作的接口,方便在程序中使用协程。这些接口要封装好底层的协程实现细节,让开发者能像调用普通函数一样使用协程 。
  • 异步流程实现:利用协程实现异步操作流程,例如在进行数据库查询、网络请求等耗时操作时,通过协程切换让程序在等待响应时去执行其他任务,而不是阻塞等待,实现异步非阻塞的编程模型 。
5 hook 钩子的实现

Hook 钩子是一种在程序执行流程中的特定位置插入自定义代码的机制。在协程中,可在协程创建、切换、销毁等关键节点设置钩子函数,用于监控协程状态、进行性能统计、添加额外的逻辑处理等。例如,在协程切换时插入钩子函数,记录切换次数和时间,便于分析协程的执行效率 。

#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <ucontext.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>ucontext_t ctx[3]; // 用于存储三个协程的上下文信息
ucontext_t main_ctx; // 用于存储主程序的上下文信息
int count = 0; // 计数器,用于控制协程循环执行的次数// hook相关类型定义
typedef ssize_t (*read_t)(int fd, void *buf, size_t count);
read_t read_f = NULL; // 用于存储原始的read函数指针typedef ssize_t (*write_t)(int fd, const void *buf, size_t count);
write_t write_f = NULL; // 用于存储原始的write函数指针// 重定义read函数,实现对read操作的hook
ssize_t read(int fd, void *buf, size_t count) {ssize_t ret = read_f(fd, buf, count); // 调用原始的read函数进行读取操作printf("read: %s\n", (char *)buf); // 打印读取到的内容return ret; // 返回读取的字节数
}// 重定义write函数,实现对write操作的hook
ssize_t write(int fd, const void *buf, size_t count) {printf("write: %s\n", (const char *)buf); // 打印要写入的内容return write_f(fd, buf, count); // 调用原始的write函数进行写入操作
}void init_hook(void) {if (!read_f) {// 通过dlsym获取下一个(原始的)read函数指针read_f = dlsym(RTLD_NEXT, "read"); }if (!write_f) {// 这里存在错误,应该获取write函数指针,却错误地获取read函数指针write_f = dlsym(RTLD_NEXT, "write"); }
}
  1. ssize_t 类型的作用

ssize_t 是 C 语言中用于表示带符号的字节数的类型,通常用于系统调用的返回值:

  • 正数:表示成功读取 / 写入的字节数
  • 0:表示文件结束(EOF)
  • -1:表示错误(通过 errno 获取具体错误码)

read/write 系统调用中,返回值可能为负(表示错误),因此使用有符号整数类型 ssize_t 而非无符号的 size_t

2. 函数指针的作用

这段代码中的函数指针(如 read_twrite_t)用于存储原始系统函数的地址,以便在 hook 函数中调用它们。具体步骤:

  1. 保存原始函数地址:通过 dlsym(RTLD_NEXT, "read") 获取系统原本的 read 函数地址
  2. 替换原始函数:定义同名的 read 函数,在其中调用保存的原始函数
  3. 添加额外逻辑:在调用前后添加自定义代码(如日志记录)

这是典型的函数钩子(Function Hook) 实现方式。

3. 回调函数在哪里?

在这段代码中,没有显式的回调函数。回调函数通常用于异步操作,而此处的 hook 是同步的。代码中的 read/write 函数直接调用原始系统函数并返回结果。

如果需要实现异步操作,可以在 hook 函数中注册回调(例如通过 aio_readlibevent),但当前代码并未涉及这部分。

6 协程实现 mysql 请求

利用协程实现对 MySQL 数据库的异步请求。当发起 MySQL 请求时,协程不会阻塞等待结果返回,而是切换去执行其他任务。等 MySQL 数据库返回结果后,再通过调度器将对应协程恢复执行,处理返回的数据,提高程序处理数据库操作的并发能力 。

7 协程多核方案分析

在多核 CPU 环境下,要考虑如何充分利用多核资源来提高协程的执行效率。比如可将不同的协程分配到不同的 CPU 核心上执行,避免单核瓶颈;或者采用多核调度策略,让协程在多核间合理迁移,平衡负载,充分发挥多核 CPU 的性能优势 。

8 协程性能测试

通过各种性能测试工具和方法,对协程的性能进行评估。例如,测试协程的创建、切换开销,协程并发处理任务时的吞吐量、响应时间等指标,分析协程在不同负载下的性能表现,找出性能瓶颈并进行优化 。

「作业:测试协程框架的百万并发」

测试环境:4 台 VMWare 虚拟机 1 台服务器 6G 内存,4 核 CPU 3 台客户端 2G 内存,2 核 CPU 操作系统:ubuntu 14.04

服务器端测试代码:https://github.com/wangbojing/NtyCo 客户端测试代码:

https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport _epoll.c

按照每一个连接启动一个协程来测试。每一个协程栈空间 4096byte 6G 内存 -> 测试协程数量 100W 无异常。并且能够正常收发数据。

「协程如何写到简历中去」

例如:项目描述:图床产品用于存储和管理图片资源,为用户提供便捷的图片上传、存储及访问服务。项目中使用 ntyco 网络框架搭建 webserver,利用其协程特性实现高并发网络请求处理;基于 dkvstore 组件构建 kv 存储模块,用于存储图片元数据等信息;同时集成 redis 进行缓存加速,借助 mediahub 进行多媒体资源管理与分发。

下一章:2.4 自研框架:基于 dpdk 的用户态协议栈的实现

http://www.xdnf.cn/news/14688.html

相关文章:

  • 原型设计Axure RP网盘资源下载与安装教程共享
  • 【记录】服务器多用户共享Conda环境——Ubuntu24.04
  • 进阶向:Django入门,从零开始构建一个Web应用
  • Word之电子章制作——1
  • kubectl exec 原理
  • 力扣第73题-矩阵置零
  • Flutter基础(Children|​​Actions|Container|decoration|child)
  • git使用详解和示例
  • 【区块链】区块链交易(Transaction)之nonce
  • 【Docker基础】Docker容器管理:docker stats及其参数详解
  • C++共享型智能指针std::shared_ptr使用介绍
  • 机器学习配置环境
  • 某音Web端消息体ProtoBuf结构解析
  • 力扣 刷题(第七十一天)
  • 第七章——一元函数微分学的物理应用
  • 多表连接查询:语法、注意事项与最佳实践
  • 如何快速学习一门新编程语言
  • 【Linux】理解进程状态与优先级:操作系统中的调度原理
  • STM32HAL 旋转编码器教程
  • 自定义上下两个方向的柱形图
  • Vue.js 中的数字格式化组件:`FormattedNumber`
  • Note2.4 机器学习:Batch Normalization Introduction
  • 栅极驱动器选的好SiC MOSFET高效又安全
  • Microsoft AZ-900AI-900考证速过经验分享
  • docker部署后端服务的脚本
  • 大模型在急性冠脉综合征预测及诊疗方案制定中的应用研究
  • 大数据在UI前端的应用创新研究:用户偏好的动态调整与优化
  • JavaScript中Object()的解析与应用
  • 免费AI助手工具深度测评:Claude4本地化部署与实战应用指南
  • Spring Boot 项目实训 - 图书信息网站