【网络编程】NtyCo协程服务器的框架(轻量级的协程方案,人称 “小线程”)
文章目录
- 简介
- 引入
- 代码分析
- 什么是协程
- 上下文的定义
- I/O 调度器
- 计算调度器
- 协程让出 CPU,完成异步 I/O 的关键一步
- 创建协程与协程调度器
- 协程的运行方式
推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接。
简介
NtyCo 是一个用纯C语言实现的协程框架,主要用于提升网络编程中的IO操作性能。这个 NTYCO 网络 IO 方案是 GITHUB 上的一个开源项目 :https://github.com/wangbojing/NtyCo
它是一款个人开源的 C 语言协程网络框架,作者王博靖(GitHub ID:wangbojing),2019 年起以个人名义在 GitHub 上迭代,对标的是腾讯开源的 libco / libgo 系列 C++ 协程库,主打“纯 C、零依赖、单线程百万并发”。没错,该项目的作者就是零声教育的 KING 老师。
本博客只作代码的部分解析解读,并不展示全部,读者可自行下载,尝试得出自己的答案。
NTYCO 的核心功能
- 协程实现:NtyCo通过汇编指令实现协程的上下文切换,支持高效的协程切换。
- IO异步操作:结合epoll机制,NtyCo能够实现高效的IO异步操作,减少阻塞等待时间。
- 调度器功能:内置调度器管理协程的运行状态,支持就绪、等待和睡眠三种状态的协程。
NtyCo 之所以被提出是因为传统的 Reactor 网络 IO 模型(我曾经写过的关于 Reactor 网络框架 的博客链接在这)是同步的,一个连接套接字读取信息后是需要立刻要执行信息输出的。读者可以注意到主函数处执行网络 IO
while (1) { // mainloop 服务器的根本struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, 5); // 5 指代等待 5 毫秒,如果 I/O 响应满员的话,立即返回// 该通知函数是需要调用系统内核的,是需要花费成本的,如果只是通知某些内容没有读完而调用,是极其得不偿失的,这是我们需要用到边沿触发的原因,只是编程难度更大了// 我们要重视操作系统内核的调动,系统 I/O 并不全体现在代码之上,而代码要考虑操作系统内核可能出现的情况;编写代码要看到代码之外的东西int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;int is_listener = 0; // 状态机-重置// 当连接异常断开时未清理资源,添加错误处理:if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { // 要注意 “|” 是按位或操作,能进行掩码叠加;“&” 是按位与操作// EPOLLHUP 表示对应的文件描述符被挂断。EPOLLERR 表示对应的文件描述符发生错误。close_connection(connfd);continue;}// 检查是否为监听套接字for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}if (is_listener) continue; // 状态机-进入下一个环节// 处理读事件if (events[i].events & EPOLLIN) {// 这是监控到了除 sockfd 以外的套接字// ET 边沿模式需循环读取,原因是要保证网络 I/O 所有内容都被读取!if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {// 连接在 recv 函数处早已释放continue; // 跳过后续处理}/////////////////////// 读操作的业务端(start) ////////////////////////// 至此,客户端的请求报文全部写完了,写入了 rbuffer 之中。我们要利用这个内存去执行业务操作// 实现 HTTP 请求,在代码里面,该函数只是形式上存在,并不是重点// http_request(&conn_list[fd]);// WebSocket 协议的请求// ws_request(&conn_list[fd]);/////////////////////// 读操作的业务端(start) ////////////////////////set_event(connfd, EPOLLOUT, 0);} else if (events[i].events & EPOLLOUT) {// 这里必须注意 EPOLLOUT 不是边沿事件触发模式,我们通常只把响应报文的 header 写入 wbuffer 中,长度是固定的,因此水平出发即可。// 至于那大段大段的文件资源传输,则是通过文件描述符之间操作完成,跳过缓冲区读写// send 会因无输输出而阻断// 函数 send: 发送固定字节的内容。// 返回值 > 0:表示成功发送了数据,返回值表示实际发送的字节数。// 返回值 == -1:表示发送操作失败。错误原因可以通过 errno 获取conn_list[connfd].send_callback(connfd);// 如果是为了测试百万并发,则用这个,不要把 fd 关闭set_event(connfd, EPOLLIN | EPOLLET, 0); // 这次输出发送事件结束了;改为 “边沿读写模式”,执行 epoll_ctl 函数,系统内核会直接把 fd 加载到 epoll 的就绪集之中} else {printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);close_connection(connfd);}}}————————————————
版权声明:本文为CSDN博主「啟明起鸣」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_40901147/article/details/148832206
读操作 conn_list[connfd].send_callback(connfd);
与写操作conn_list[connfd].send_callback(connfd);
是连在一起执行的。
这一点极其不好,如果是要执行计算密度高、输出量大的任务,那就会造成连接的拥塞。这是单线程 EPOLL 事件驱动的 REACTOR 模型的弊端——同步 IO !
如果同步方案不好的话,那就异步咯~~。可是,我曾经在文章 《【网络编程】多路复用的网络 I/O 服务器(C代码),select、poll 和多线程共三个版本》 里指出线程是开销很大的一种计算机资源,8M 一个线程内存栈,而一个线程伺候一个连接的话,我们有百万连接的设计,那也就是说我们需要内存 8000 多个 G 的内存。我们断然无法接受。
慢着!!异步方案一定需要线程的吗?不一定的,我们还有协程这个东西,我们可以做一个纯用户态的协程,wangbojing 的方案是纯 C 语言的框架。
维度 | NtyCo | libco/libgo |
---|---|---|
语言 | 纯 C | C++03 / C++17 |
依赖 | 0(标准库即可) | boost / C++ runtime |
栈大小 | 8 KB 固定 | 64 KB 起跳 |
单线程并发 | 官方 120 万 | 官方 50 万 |
代码行数 | 5 k | 50 k+ |
引入
我在博客《【网络编程】简易的 p2p 模型,实现两台虚拟机之间的简单点对点通信,并以小见大观察 TCP 协议的具体运行》 里面介绍过,“一个服务器的功能分两类,一是主动类,另一个是被动类”。所谓的主动类,就是根据用户的输入主动去执行计算任务并且完成输出。所谓的被动类,就是随时就绪 stand-by,随时准备接受来自客户端不知道什么时候发来的消息。
对于被动类的功能,最适合的就是使用等待红黑树全局管理全部连接,使用 EPOLL内核态去实现快速的事件监听,然后接受所有的消息。(有人可能会问,EPOLL 也维护了一个红黑树管理全部连接,要注意的是这个红黑树只有事件+sockfd,无法装载那么多信息)
对于主动类的功能,最适合的就是扔给睡眠红黑树延时延迟,时间到了就加入就绪队列里面,挨个调度执行。或者直接进入就绪队列执行。总之,绝不能再由 EPOLL 事件驱动了,需要用户自己去处理,延时任务交给睡眠树,即时任务交给就绪队列去执行。
NTYCO 的作用是切分了连接的三种状态——就绪执行、睡眠延时、随时就绪等待
任何一个连接都是绑定了一个轻量级的协程,而协程就在这三种状态之间转换。三种状态是互斥的,在就绪队列就不会在等待树和睡眠树。
代码分析
我只做部分代码解析,不粘贴所有的代码,读者请参考 https://github.com/wangbojing/NtyCo。
什么是协程
在源代码之中是这么定义的
typedef struct _nty_coroutine {//privatenty_cpu_ctx ctx; // 协程上下文,保存寄存器状态proc_coroutine func; // 任务协程函数void *arg; // 任务函数参数void *data; // 协程私有栈内存指针size_t stack_size; // 栈总大小(字节)size_t last_stack_size; // 上次记录的栈使用量(用于内存优化)nty_coroutine_status status; // 协程状态nty_schedule *sched; // 所属调度器的指针uint64_t birth; // 创建时间戳(微秒)uint64_t id; // 协程唯一IDint fd; // 关联的文件描述符unsigned short events; // 关注的事件(POLLIN/POLLOUT)char funcname[64]; // 协程函数名(用于调试)struct _nty_coroutine *co_join; // 等待 join 的协程void **co_exit_ptr; // 协程退出值指针void *stack; // 协程私有栈void *ebp; // %rbp 寄存器的数据指针uint32_t ops; // 操作计数器(用于优先级调整)uint64_t sleep_usecs; // 睡眠时间(微秒)RB_ENTRY(_nty_coroutine) sleep_node; // 睡眠红黑树节点(按唤醒时间排序)RB_ENTRY(_nty_coroutine) wait_node; // 等待IO红黑树节点(按fd排序)LIST_ENTRY(_nty_coroutine) busy_next; // 忙碌协程链表节点TAILQ_ENTRY(_nty_coroutine) ready_next; // 就绪队列节点TAILQ_ENTRY(_nty_coroutine) defer_next; // 延迟队列节点TAILQ_ENTRY(_nty_coroutine) cond_next; // 条件变量队列节点TAILQ_ENTRY(_nty_coroutine) io_next; // IO队列节点TAILQ_ENTRY(_nty_coroutine) compute_next; // 计算队列节点// IO 操作状态struct {void *buf; // 数据缓冲区size_t nbytes; // 数据长度int fd; // 操作的文件描述符int ret; // 操作返回值int err; // 错误码} io;// nty_schedule_sched_wait struct _nty_coroutine_compute_sched *compute_sched; // 专用计算调度器int ready_fds; // 就绪的文件描述符数量struct pollfd *pfds; // poll 使用的文件描述符数组nfds_t nfds; // poll 文件描述符数量
} nty_coroutine;
这个用户态协程需要模仿线程,拥有
1、任务函数、执行任务函数所需要的参数。一个协程终其一生只能做一个任务。
2、协程的上下文。记录一个协程做到了任务函数的哪一步,用于恢复原样。
3、专用于 CPU 计算推理任务的调度器 _nty_coroutine_compute_sched
,保存计算结果的缓冲区(即数据缓冲区)。计算调度器占一个线程,执行具体计算任务可用多个线程,占用其他 CPU,使用线程锁保护数据。这里不赘述。
4、专门用于 I/O 任务的普通调度器 nty_schedule *sched
。
5、以及管理协程状态的数据结构(比如睡眠//等待红黑树、延迟//就绪队列)。比如,RB_ENTRY(_nty_coroutine) sleep_node
。
其余的还有杂七杂八的维护信息,比如协程 ID 、创建时间、协定的睡眠时间。
协程就是「可以暂停 / 继续执行的函数」——它让程序在同一个线程里自己决定“我先让出 CPU,等会儿再回来”,既不用线程切换,也不必把逻辑拆成回调。
上下文的定义
CPU 是通过寄存器去记住所传入的参数、执行指令栈、传出参数。而汇编语言是可以直接操作这些的设备零件的。
typedef struct _nty_cpu_ctx {void *esp; //保存 %rspvoid *ebp; //保存 %rbpvoid *eip; //获取栈顶存储的返回地址 rip, 这个返回地址就是协程被切换时的指令位置void *edi; //保存 %rbpvoid *esi; //保存 %r12void *ebx; //保存 %r13void *r1; //保存 %r14void *r2; //保存 %r15void *r3; //没用到void *r4; //没用到void *r5; //没用到
} nty_cpu_ctx;
寄存器 | 合同条款 |
---|---|
%rdi, %rsi, %rdx, %rcx, %r8, %r9 | 调用者 → 被调者 传参用;调用者填好即可 |
%rax | 返回值;被调者把它填好返回给调用者 |
%rbx, %rbp, %r12–%r15 | 被调者负责保存;如果被调者要用,先 push,返回前 pop |
%r10, %r11 | 调用者负责保存;随便用,不保护也行 |
上下文切换的函数
#define __volatile__ // 让 GCC 编译器不要优化汇编指令的顺序int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);__asm__ __volatile__ (
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) \n"
" ret \n"
);static void _exec(void *lt) {nty_coroutine *co = (nty_coroutine*)lt;co->func(co->arg); // 开始执行任务// 在执行完任务后,状态标记为退出+结束+离开co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH));// 任务函数执行完后,协程让出nty_coroutine_yield(co); // 切换任务协程}// 协程切换
void nty_coroutine_yield(nty_coroutine *co) {co->ops = 0;_switch(&co->sched->ctx, &co->ctx);
}// 协程恢复——恢复任务执行
// 初始化新协程/加载栈 → 上下文切换 → 状态检查
int nty_coroutine_resume(nty_coroutine *co) {if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {nty_coroutine_init(co);} // 一个线程唯一地拥有一个调度器nty_schedule *sched = nty_coroutine_get_sched(); // 从获取调度器的指针sched->curr_thread = co; // 调度器里的当前处理协程_switch(&co->ctx, &co->sched->ctx); // 切换上下文————相当于在执行函数nty_coroutine_madvise(co); // 协程栈的内存优化sched->curr_thread = NULL; // 清空当前运行协程标记 if (co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) { // 协程的退出状态if (co->status & BIT(NTY_COROUTINE_STATUS_DETACH)) {nty_coroutine_free(co); // 协程退出后,若为 DETACH 状态则立即释放资源。}return -1;} return 0;
}// 协程的初始化函数
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **)(co->stack + co->stack_size); // 栈顶指针stack[-3] = NULL;stack[-2] = (void *)co;co->ctx.esp = (void*)stack - (4 * sizeof(void*)); // 取出寄存器的信息 %rsp,这只是形式上的,因为一开始根本没有记录上下文co->ctx.ebp = (void*)stack - (3 * sizeof(void*)); // 取出寄存器信息 %rbpco->ctx.eip = (void*)_exec; // 获取栈顶存储的返回地址 rip, 这个返回地址就是协程被切换时的指令位置co->status = BIT(NTY_COROUTINE_STATUS_READY); // 本协程已就绪}
I/O 调度器
typedef struct _nty_schedule {uint64_t birth; // 调度器创建时间戳(微秒级)nty_cpu_ctx ctx; // 调度器上下文void *stack; // 调度器的主栈内存指针(核心字段)size_t stack_size; // 栈大小int spawned_coroutines; // 当前创建的协程总数uint64_t default_timeout; // 默认超时时间(微秒)struct _nty_coroutine *curr_thread; // 指向当前正在运行的协程int page_size; // 系统内存页大小(通常4KB)int poller_fd; // epoll实例的文件描述符int eventfd; // 用于唤醒epoll_wait的事件fdstruct epoll_event eventlist[NTY_CO_MAX_EVENTS]; // epoll返回的事件数组int nevents; // 当前待处理事件数量int num_new_events; // 新增事件计数pthread_mutex_t defer_mutex; // 延迟队列的互斥锁nty_coroutine_queue ready; // 就绪队列(可立即运行的协程)nty_coroutine_queue defer; // 延迟队列(稍后执行的协程)nty_coroutine_link busy; // 活跃协程链表nty_coroutine_rbtree_sleep sleeping; // 睡眠协程红黑树(按唤醒时间排序)nty_coroutine_rbtree_wait waiting; // 等待IO协程红黑树(按fd排序)//private } nty_schedule;
计算调度器
一个协程一生只会做一件事, I/O 协程想要完成计算任务只能委托其他协程去做。做完是可以通知委托协程的
// 计算调度器
typedef struct _nty_coroutine_compute_sched {nty_cpu_ctx ctx; // 自定义汇编上下文nty_coroutine_queue coroutines; // 协程队列 (coroutines)nty_coroutine *curr_coroutine; // 当前协程指针 (curr_coroutine)// 运行控制同步pthread_mutex_t run_mutex;pthread_cond_t run_cond;// 协程队列锁 (co_mutex)pthread_mutex_t co_mutex; // 保护协程队列的并发访问LIST_ENTRY(_nty_coroutine_compute_sched) compute_next; // 将多个计算调度器组织成链表nty_coroutine_compute_status compute_status; // 计算状态 (compute_status)
} nty_coroutine_compute_sched;
协程让出 CPU,完成异步 I/O 的关键一步
nty_poll_inner
函数内置了协程调度函数 nty_coroutine_yield
// 这是一个事件转换函数,因为我们使用poll的超时计算,传入 POLL 事件,传出的是 EPOLL 事件
static uint32_t nty_pollevent_2epoll( short events )
{uint32_t e = 0; if( events & POLLIN ) e |= EPOLLIN;if( events & POLLOUT ) e |= EPOLLOUT;if( events & POLLHUP ) e |= EPOLLHUP;if( events & POLLERR ) e |= EPOLLERR;if( events & POLLRDNORM ) e |= EPOLLRDNORM;if( events & POLLWRNORM ) e |= EPOLLWRNORM;return e;
}
static short nty_epollevent_2poll( uint32_t events )
{short e = 0; if( events & EPOLLIN ) e |= POLLIN;if( events & EPOLLOUT ) e |= POLLOUT;if( events & EPOLLHUP ) e |= POLLHUP;if( events & EPOLLERR ) e |= POLLERR;if( events & EPOLLRDNORM ) e |= POLLRDNORM;if( events & EPOLLWRNORM ) e |= POLLWRNORM;return e;
}
/** nty_poll_inner --> 1. sockfd--> epoll, 2 yield, 3. epoll x sockfd* fds : * * POLL 最出名的是其超时处理*/
// 以下的这个函数其实是发生在任何一个读任务之前的,毕竟读任务其实是一个被动技能,他会让这个协程让出给调度器,
// 并且进入睡眠状态,等待后台把事情给完成了再让这个写成醒过来
static int nty_poll_inner(struct pollfd *fds, nfds_t nfds, int timeout) {// nfds_t 是在 <poll.h> 头文件中定义的,本质是一个无符号长整型if (timeout == 0){return poll(fds, nfds, timeout); // 超时参数 timeout 的单位是 毫秒 (milliseconds)。}if (timeout < 0){timeout = INT_MAX; // 意为无限阻塞}nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) {printf("scheduler not exit!\n");return -1;}nty_coroutine *co = sched->curr_thread;// int i = 0;for (i = 0;i < nfds;i ++) {struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events); // 这个 event 将会是 POLLOUT | POLLERR | POLLHUPev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev); co->events = fds[i].events;nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout);}nty_coroutine_yield(co); // 控制权交还给调度器,当然上下文也被保存了// 什么情况下这个写成才会回归呢?for (i = 0;i < nfds;i ++) {struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev); // 这个 fd 已经把消息都读完了,之后就不需要 epoll 了nty_schedule_desched_wait(fds[i].fd);}return nfds;
}
nty_poll_inner
函数将会用在各种网络函数的封装上,保证在执行下一个 I/O 之前,先把 CPU 让出,但又把命令全部都记住了。这就好像一个学生,在考试前偷偷把考试题目背下来。比如,下面的 nty_recv 就是 NTYCO 改良后的 recv 函数
ssize_t nty_recv(int fd, void *buf, size_t len, int flags) {struct pollfd fds;fds.fd = fd;fds.events = POLLIN | POLLERR | POLLHUP;nty_poll_inner(&fds, 1, 1); // 超时参数 timeout 的单位是 毫秒 (milliseconds)。这里说明了 recv 的超时时间是 1 毫秒(10 的负三次方)int ret = recv(fd, buf, len, flags);if (ret < 0) {//if (errno == EAGAIN) return ret;if (errno == ECONNRESET) return -1;//printf("recv error : %d, ret : %d\n", errno, ret);}return ret;
}
创建协程与协程调度器
需要清楚的是,先创建协程协程,发现没有调度器再顺便创造,也就是说协程调度器的创建是被动创建的。首先是创建协程,
// coroutine -->
// create
// 协程创建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);// 使用 pthread_once 确保全局调度器键值 (global_sched_key) 只初始化一次// 首次调用时触发 nty_coroutine_sched_key_creator 创建 TLS 键// 获取当前线程的调度器nty_schedule *sched = nty_coroutine_get_sched();// 惰性初始化:每个线程首次创建协程时自动创建专属调度器// 参数 0 表示使用默认栈大小(通常为 128KB)if (sched == NULL) {nty_schedule_create(0);sched = nty_coroutine_get_sched();if (sched == NULL) {printf("Failed to create scheduler\n");return -1; // 错误码 -1: 调度器创建失败}}nty_coroutine* co = calloc(1, sizeof(nty_coroutine));if (co == NULL) {printf("Failed to allocate memory for new coroutine\n");return -2;}// 自定义汇编模式:手动分配对齐的栈内存,posix_memalign 是 POSIX 标准提供的 按指定对齐方式分配动态内存 的函数。// getpagesize() 是 POSIX 提供的系统调用/库函数,用来获取当前系统 内存页(page) 的大小(字节数)。// 内存页就是操作系统管理物理内存的最小单位,把物理内存想成一本很厚的草稿本,页就是固定大小的活页纸。int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);// 内存对齐:按页大小对齐(通常 4KB),避免 MMU 分页错误// 手动管理:后续需要显式释放 (free(co->stack))if (ret) {printf("Failed to allocate stack for new coroutine\n");free(co);return -3;}co->stack_size = sched->stack_size; // 协程栈大小等于调度器栈的大小,128 KBco->sched = sched; // 就像线程池与工作线程互持股份一样,是为了相互通信co->status = BIT(NTY_COROUTINE_STATUS_NEW); // 创建状态co->id = sched->spawned_coroutines ++; // 只要创建一个协程,它的 ID 号就是当前所建立的协程数 +1co->func = func; // 协程的任务函数co->fd = -1; // 初始化文件描述符co->events = 0; // 初始化事件掩码co->arg = arg; // 保存用户参数co->birth = nty_coroutine_usec_now(); // 记录创建时间戳*new_co = co; // 返回创建的协程指针// ── 加入调度队列(双向链表队列) ──────────────────────TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);return 0;
}
紧接着是,调度器创建
int nty_schedule_create(int stack_size) {int sched_stack_size = stack_size ? stack_size : NTY_CO_MAX_STACKSIZE;nty_schedule *sched = (nty_schedule*)calloc(1, sizeof(nty_schedule));if (sched == NULL) {printf("Failed to initialize scheduler\n");return -1;}// pthread_setspecific(global_sched_key, sched) 是专门为调度器 sched 设置一把钥匙assert(pthread_setspecific(global_sched_key, sched) == 0);// 调度器掌握 EPOLL 的套接字管理 epfd sched->poller_fd = nty_epoller_create();if (sched->poller_fd == -1) {printf("Failed to initialize epoller\n");nty_schedule_free(sched);return -2;}nty_epoller_ev_register_trigger(); // 调度器 sched 拥有计时器sched->stack_size = sched_stack_size; // 调度器栈的大小为 128 KBsched->page_size = getpagesize(); // 调度器的页大小为 4 KBsched->stack = NULL;bzero(&sched->ctx, sizeof(nty_cpu_ctx)); // 初始化当前协程的上下文sched->spawned_coroutines = 0; // 已建立多少个协程sched->default_timeout = 3000000u; // 把调度器的默认超时设成 3 秒。RB_INIT(&sched->sleeping); // 初始化睡眠树RB_INIT(&sched->waiting); // 初始化等待树sched->birth = nty_coroutine_usec_now(); // 调度器的创立计时TAILQ_INIT(&sched->ready); // 就绪队列TAILQ_INIT(&sched->defer); // 延迟队列LIST_INIT(&sched->busy); // 忙链表}
协程的运行方式
调度器与协程之间相互协调,协程执行后让出 CPU 给调度器,调度器查看睡眠树的协程是否苏醒了,就序列表里面是否准备好了,等待树里面是否有突发情况。如果有的话,挨个让出 CPU 给这些协程。普通的单线程执行 I/O 任务就够了,如果遇上计算密集型,那得在设计计算调度器的调度函数(源代码里面并没有关于计算调度器的运行代码实例)。
void nty_schedule_run(void) {nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) return ;// 执行协程有三个来源:睡眠树+就绪队列+等待红黑树,如此循环着来(有优先级)while (!nty_schedule_isdone(sched)) { // 如果调度器没有启动的话// 1. expired --> sleep rbtree 第一步:处理超时协程(睡眠红黑树),就是观察预定的时间到了与否nty_coroutine *expired = NULL;while ((expired = nty_schedule_expired(sched)) != NULL) { // 只移除那些 “成熟” 的节点nty_coroutine_resume(expired); // 这个是在 nty_coroutine.c 里面定义的,从睡眠红黑树摘下来后,就会被拿去立即执行}// 2. ready queue 第二步:处理就绪协程队列,全部处理,所有刚生成的协程都会进入这个队列里面nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);while (!TAILQ_EMPTY(&sched->ready)) {nty_coroutine *co = TAILQ_FIRST(&sched->ready);TAILQ_REMOVE(&co->sched->ready, co, ready_next);if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) { // 协程状态为结束的话,释放协程资源nty_coroutine_free(co);break;}nty_coroutine_resume(co); // 立即执行if (co == last_co_ready) break;}// 3. wait rbtree 等待树,这里绕了一圈,用户发送数据,EPOLL 只有可能接受 EPOLLIN 事件(不判断),从等待树里找到协程,协程再给出上下文(相当于未雨绸缪)nty_schedule_epoll(sched); // 超时为 t 的 EPOLL 实例识别 IO 事件的出发,并且内容已经保存在了调度器 sched 上面while (sched->num_new_events) { // 有限次循环int idx = --sched->num_new_events;struct epoll_event *ev = sched->eventlist+idx;int fd = ev->data.fd;int is_eof = ev->events & EPOLLHUP; // IO 文件被挂断,这是 Linux 内核负责标记的if (is_eof) errno = ECONNRESET;nty_coroutine *co = nty_schedule_search_wait(fd);if (co != NULL) {if (is_eof) {co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF); // 协程也被打上标签,执行最后这一次了}nty_coroutine_resume(co); // 立即执行,这不是回调函数}is_eof = 0;}}nty_schedule_free(sched);return ;
}
我们观察到计算等待树的协程触发是需要 EPOLL 事件去驱动的,因为等待树意味着不确定的外界输入。