当前位置: 首页 > news >正文

【网络编程】NtyCo协程服务器的框架(轻量级的协程方案,人称 “小线程”)

文章目录

  • 简介
  • 引入
  • 代码分析
    • 什么是协程
    • 上下文的定义
    • I/O 调度器
    • 计算调度器
    • 协程让出 CPU,完成异步 I/O 的关键一步
    • 创建协程与协程调度器
    • 协程的运行方式

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接。

简介

NtyCo 是一个用纯C语言实现的协程框架,主要用于提升网络编程中的IO操作性能。这个 NTYCO 网络 IO 方案是 GITHUB 上的一个开源项目 :https://github.com/wangbojing/NtyCo

它是一款个人开源的 C 语言协程网络框架,作者王博靖(GitHub ID:wangbojing),2019 年起以个人名义在 GitHub 上迭代,对标的是腾讯开源的 libco / libgo 系列 C++ 协程库,主打“纯 C、零依赖、单线程百万并发”。没错,该项目的作者就是零声教育的 KING 老师。

本博客只作代码的部分解析解读,并不展示全部,读者可自行下载,尝试得出自己的答案。

NTYCO 的核心功能

  • 协程实现:NtyCo通过汇编指令实现协程的上下文切换,支持高效的协程切换。
  • IO异步操作:结合epoll机制,NtyCo能够实现高效的IO异步操作,减少阻塞等待时间。
  • 调度器功能:内置调度器管理协程的运行状态,支持就绪、等待和睡眠三种状态的协程。

NtyCo 之所以被提出是因为传统的 Reactor 网络 IO 模型(我曾经写过的关于 Reactor 网络框架 的博客链接在这)是同步的,一个连接套接字读取信息后是需要立刻要执行信息输出的。读者可以注意到主函数处执行网络 IO

while (1) { // mainloop	服务器的根本struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, 5);	//	5 指代等待 5 毫秒,如果 I/O 响应满员的话,立即返回//	该通知函数是需要调用系统内核的,是需要花费成本的,如果只是通知某些内容没有读完而调用,是极其得不偿失的,这是我们需要用到边沿触发的原因,只是编程难度更大了//	我们要重视操作系统内核的调动,系统 I/O 并不全体现在代码之上,而代码要考虑操作系统内核可能出现的情况;编写代码要看到代码之外的东西int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;int is_listener = 0;	//	状态机-重置// 当连接异常断开时未清理资源,添加错误处理:if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) {	// 要注意 “|” 是按位或操作,能进行掩码叠加;“&” 是按位与操作//  EPOLLHUP 表示对应的文件描述符被挂断。EPOLLERR 表示对应的文件描述符发生错误。close_connection(connfd);continue;}// 检查是否为监听套接字for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}if (is_listener) continue;	//	状态机-进入下一个环节// 处理读事件if (events[i].events & EPOLLIN) {// 这是监控到了除 sockfd 以外的套接字// ET 边沿模式需循环读取,原因是要保证网络 I/O 所有内容都被读取!if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {//	连接在 recv 函数处早已释放continue; // 跳过后续处理}///////////////////////		读操作的业务端(start)     //////////////////////////	至此,客户端的请求报文全部写完了,写入了 rbuffer 之中。我们要利用这个内存去执行业务操作//	实现 HTTP 请求,在代码里面,该函数只是形式上存在,并不是重点//	http_request(&conn_list[fd]);//	WebSocket 协议的请求// 	ws_request(&conn_list[fd]);///////////////////////		读操作的业务端(start)     ////////////////////////set_event(connfd, EPOLLOUT, 0);} else if (events[i].events & EPOLLOUT) {//	这里必须注意 EPOLLOUT 不是边沿事件触发模式,我们通常只把响应报文的 header 写入 wbuffer 中,长度是固定的,因此水平出发即可。//	至于那大段大段的文件资源传输,则是通过文件描述符之间操作完成,跳过缓冲区读写// send 会因无输输出而阻断// 函数 send: 发送固定字节的内容。// 返回值 > 0:表示成功发送了数据,返回值表示实际发送的字节数。// 返回值 == -1:表示发送操作失败。错误原因可以通过 errno 获取conn_list[connfd].send_callback(connfd);//	如果是为了测试百万并发,则用这个,不要把 fd 关闭set_event(connfd, EPOLLIN | EPOLLET, 0);	// 这次输出发送事件结束了;改为 “边沿读写模式”,执行 epoll_ctl 函数,系统内核会直接把 fd 加载到 epoll 的就绪集之中}	else {printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);close_connection(connfd);}}}————————————————
版权声明:本文为CSDN博主「啟明起鸣」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_40901147/article/details/148832206

读操作 conn_list[connfd].send_callback(connfd); 与写操作conn_list[connfd].send_callback(connfd); 是连在一起执行的。
这一点极其不好,如果是要执行计算密度高、输出量大的任务,那就会造成连接的拥塞。这是单线程 EPOLL 事件驱动的 REACTOR 模型的弊端——同步 IO !

如果同步方案不好的话,那就异步咯~~。可是,我曾经在文章 《【网络编程】多路复用的网络 I/O 服务器(C代码),select、poll 和多线程共三个版本》 里指出线程是开销很大的一种计算机资源,8M 一个线程内存栈,而一个线程伺候一个连接的话,我们有百万连接的设计,那也就是说我们需要内存 8000 多个 G 的内存。我们断然无法接受。

慢着!!异步方案一定需要线程的吗?不一定的,我们还有协程这个东西,我们可以做一个纯用户态的协程,wangbojing 的方案是纯 C 语言的框架。

维度NtyColibco/libgo
语言纯 CC++03 / C++17
依赖0(标准库即可)boost / C++ runtime
栈大小8 KB 固定64 KB 起跳
单线程并发官方 120 万官方 50 万
代码行数5 k50 k+

引入

我在博客《【网络编程】简易的 p2p 模型,实现两台虚拟机之间的简单点对点通信,并以小见大观察 TCP 协议的具体运行》 里面介绍过,“一个服务器的功能分两类,一是主动类,另一个是被动类”。所谓的主动类,就是根据用户的输入主动去执行计算任务并且完成输出。所谓的被动类,就是随时就绪 stand-by,随时准备接受来自客户端不知道什么时候发来的消息。

对于被动类的功能,最适合的就是使用等待红黑树全局管理全部连接,使用 EPOLL内核态去实现快速的事件监听,然后接受所有的消息。(有人可能会问,EPOLL 也维护了一个红黑树管理全部连接,要注意的是这个红黑树只有事件+sockfd,无法装载那么多信息)

对于主动类的功能,最适合的就是扔给睡眠红黑树延时延迟,时间到了就加入就绪队列里面,挨个调度执行。或者直接进入就绪队列执行。总之,绝不能再由 EPOLL 事件驱动了,需要用户自己去处理,延时任务交给睡眠树,即时任务交给就绪队列去执行。

NTYCO 的作用是切分了连接的三种状态——就绪执行、睡眠延时、随时就绪等待
在这里插入图片描述
任何一个连接都是绑定了一个轻量级的协程,而协程就在这三种状态之间转换。三种状态是互斥的,在就绪队列就不会在等待树和睡眠树。

代码分析

我只做部分代码解析,不粘贴所有的代码,读者请参考 https://github.com/wangbojing/NtyCo。

什么是协程

在源代码之中是这么定义的

typedef struct _nty_coroutine {//privatenty_cpu_ctx ctx;					// 协程上下文,保存寄存器状态proc_coroutine func;				// 任务协程函数void *arg;							// 任务函数参数void *data;							// 协程私有栈内存指针size_t stack_size;					// 栈总大小(字节)size_t last_stack_size;				// 上次记录的栈使用量(用于内存优化)nty_coroutine_status status;		// 协程状态nty_schedule *sched;				// 所属调度器的指针uint64_t birth;						// 创建时间戳(微秒)uint64_t id;						// 协程唯一IDint fd;								// 关联的文件描述符unsigned short events;   			// 关注的事件(POLLIN/POLLOUT)char funcname[64];					// 协程函数名(用于调试)struct _nty_coroutine *co_join;		// 等待 join 的协程void **co_exit_ptr;					// 协程退出值指针void *stack;						// 协程私有栈void *ebp;							// %rbp 寄存器的数据指针uint32_t ops;						// 操作计数器(用于优先级调整)uint64_t sleep_usecs;				// 睡眠时间(微秒)RB_ENTRY(_nty_coroutine) sleep_node;					// 睡眠红黑树节点(按唤醒时间排序)RB_ENTRY(_nty_coroutine) wait_node;						// 等待IO红黑树节点(按fd排序)LIST_ENTRY(_nty_coroutine) busy_next;					// 忙碌协程链表节点TAILQ_ENTRY(_nty_coroutine) ready_next;					// 就绪队列节点TAILQ_ENTRY(_nty_coroutine) defer_next;  				// 延迟队列节点TAILQ_ENTRY(_nty_coroutine) cond_next;					// 条件变量队列节点TAILQ_ENTRY(_nty_coroutine) io_next;					// IO队列节点TAILQ_ENTRY(_nty_coroutine) compute_next;				// 计算队列节点//	 IO 操作状态struct {void *buf;			// 数据缓冲区size_t nbytes;		// 数据长度int fd;				// 操作的文件描述符int ret;			// 操作返回值int err;			// 错误码} io;//	nty_schedule_sched_wait struct _nty_coroutine_compute_sched *compute_sched;		// 专用计算调度器int ready_fds;											// 就绪的文件描述符数量struct pollfd *pfds;									// poll 使用的文件描述符数组nfds_t nfds;											// poll 文件描述符数量
} nty_coroutine;

这个用户态协程需要模仿线程,拥有
1、任务函数、执行任务函数所需要的参数。一个协程终其一生只能做一个任务。
2、协程的上下文。记录一个协程做到了任务函数的哪一步,用于恢复原样。
3、专用于 CPU 计算推理任务的调度器 _nty_coroutine_compute_sched,保存计算结果的缓冲区(即数据缓冲区)。计算调度器占一个线程,执行具体计算任务可用多个线程,占用其他 CPU,使用线程锁保护数据。这里不赘述。
4、专门用于 I/O 任务的普通调度器 nty_schedule *sched
5、以及管理协程状态的数据结构(比如睡眠//等待红黑树、延迟//就绪队列)。比如,RB_ENTRY(_nty_coroutine) sleep_node

其余的还有杂七杂八的维护信息,比如协程 ID 、创建时间、协定的睡眠时间。

协程就是「可以暂停 / 继续执行的函数」——它让程序在同一个线程里自己决定“我先让出 CPU,等会儿再回来”,既不用线程切换,也不必把逻辑拆成回调。

上下文的定义

CPU 是通过寄存器去记住所传入的参数、执行指令栈、传出参数。而汇编语言是可以直接操作这些的设备零件的。

typedef struct _nty_cpu_ctx {void *esp; //保存 	%rspvoid *ebp; //保存	%rbpvoid *eip; //获取栈顶存储的返回地址 rip, 这个返回地址就是协程被切换时的指令位置void *edi; //保存	%rbpvoid *esi; //保存	%r12void *ebx; //保存	%r13void *r1;  //保存	%r14void *r2;  //保存	%r15void *r3;  //没用到void *r4;  //没用到void *r5;  //没用到
} nty_cpu_ctx;
寄存器合同条款
%rdi, %rsi, %rdx, %rcx, %r8, %r9调用者 → 被调者 传参用;调用者填好即可
%rax返回值;被调者把它填好返回给调用者
%rbx, %rbp, %r12–%r15被调者负责保存;如果被调者要用,先 push,返回前 pop
%r10, %r11调用者负责保存;随便用,不保护也行

上下文切换的函数

#define __volatile__	//	让 GCC 编译器不要优化汇编指令的顺序int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);__asm__ __volatile__ (
"    .text                                  \n"
"       .p2align 4,,15                                   \n"
".globl _switch                                          \n"
".globl __switch                                         \n"
"_switch:                                                \n"
"__switch:                                               \n"
"       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
"       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
"       movq (%rsp), %rax       # save insn_pointer      \n"
"       movq %rax, 16(%rsi)                              \n"
"       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
"       movq %r12, 32(%rsi)                              \n"
"       movq %r13, 40(%rsi)                              \n"
"       movq %r14, 48(%rsi)                              \n"
"       movq %r15, 56(%rsi)                              \n"
"       movq 56(%rdi), %r15                              \n"
"       movq 48(%rdi), %r14                              \n"
"       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
"       movq 32(%rdi), %r12                              \n"
"       movq 24(%rdi), %rbx                              \n"
"       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
"       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
"       movq 16(%rdi), %rax     # restore insn_pointer   \n"
"       movq %rax, (%rsp)                                \n"
"       ret                                              \n"
);static void _exec(void *lt) {nty_coroutine *co = (nty_coroutine*)lt;co->func(co->arg);  //	开始执行任务//	在执行完任务后,状态标记为退出+结束+离开co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH));//	任务函数执行完后,协程让出nty_coroutine_yield(co);	//	切换任务协程}//	协程切换
void nty_coroutine_yield(nty_coroutine *co) {co->ops = 0;_switch(&co->sched->ctx, &co->ctx);
}//	协程恢复——恢复任务执行
//	初始化新协程/加载栈 → 上下文切换 → 状态检查
int nty_coroutine_resume(nty_coroutine *co) {if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {nty_coroutine_init(co);} //	一个线程唯一地拥有一个调度器nty_schedule *sched = nty_coroutine_get_sched();		//	从获取调度器的指针sched->curr_thread = co;								//	调度器里的当前处理协程_switch(&co->ctx, &co->sched->ctx);						//	切换上下文————相当于在执行函数nty_coroutine_madvise(co);								//	协程栈的内存优化sched->curr_thread = NULL;								// 	清空当前运行协程标记	if (co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) {	//	协程的退出状态if (co->status & BIT(NTY_COROUTINE_STATUS_DETACH)) {nty_coroutine_free(co);							//	协程退出后,若为 DETACH 状态则立即释放资源。}return -1;} return 0;
}//	协程的初始化函数
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **)(co->stack + co->stack_size);		//	栈顶指针stack[-3] = NULL;stack[-2] = (void *)co;co->ctx.esp = (void*)stack - (4 * sizeof(void*));			//	取出寄存器的信息 %rsp,这只是形式上的,因为一开始根本没有记录上下文co->ctx.ebp = (void*)stack - (3 * sizeof(void*));			//	取出寄存器信息 %rbpco->ctx.eip = (void*)_exec;			//	获取栈顶存储的返回地址 rip, 这个返回地址就是协程被切换时的指令位置co->status = BIT(NTY_COROUTINE_STATUS_READY);				//	本协程已就绪}

I/O 调度器

typedef struct _nty_schedule {uint64_t birth;							//	调度器创建时间戳(微秒级)nty_cpu_ctx ctx;						// 调度器上下文void *stack;							// 调度器的主栈内存指针(核心字段)size_t stack_size;						// 栈大小int spawned_coroutines;					// 当前创建的协程总数uint64_t default_timeout;				// 默认超时时间(微秒)struct _nty_coroutine *curr_thread;		// 指向当前正在运行的协程int page_size;							// 系统内存页大小(通常4KB)int poller_fd;							// epoll实例的文件描述符int eventfd;							// 用于唤醒epoll_wait的事件fdstruct epoll_event eventlist[NTY_CO_MAX_EVENTS];	// epoll返回的事件数组int nevents;							// 当前待处理事件数量int num_new_events;						// 新增事件计数pthread_mutex_t defer_mutex;			// 延迟队列的互斥锁nty_coroutine_queue ready;					// 就绪队列(可立即运行的协程)nty_coroutine_queue defer;					// 延迟队列(稍后执行的协程)nty_coroutine_link busy;					//	活跃协程链表nty_coroutine_rbtree_sleep sleeping;		//	睡眠协程红黑树(按唤醒时间排序)nty_coroutine_rbtree_wait waiting;			//	等待IO协程红黑树(按fd排序)//private } nty_schedule;

计算调度器

一个协程一生只会做一件事, I/O 协程想要完成计算任务只能委托其他协程去做。做完是可以通知委托协程的

//	计算调度器
typedef struct _nty_coroutine_compute_sched {nty_cpu_ctx ctx;												//	自定义汇编上下文nty_coroutine_queue coroutines;									//	协程队列 (coroutines)nty_coroutine *curr_coroutine;									//	当前协程指针 (curr_coroutine)//	运行控制同步pthread_mutex_t run_mutex;pthread_cond_t run_cond;//	协程队列锁 (co_mutex)pthread_mutex_t co_mutex;										//	保护协程队列的并发访问LIST_ENTRY(_nty_coroutine_compute_sched) compute_next;			//	将多个计算调度器组织成链表nty_coroutine_compute_status compute_status;					//	计算状态 (compute_status)
} nty_coroutine_compute_sched;

协程让出 CPU,完成异步 I/O 的关键一步

nty_poll_inner 函数内置了协程调度函数 nty_coroutine_yield


//	这是一个事件转换函数,因为我们使用poll的超时计算,传入 POLL 事件,传出的是 EPOLL 事件
static uint32_t nty_pollevent_2epoll( short events )
{uint32_t e = 0;	if( events & POLLIN ) 	e |= EPOLLIN;if( events & POLLOUT )  e |= EPOLLOUT;if( events & POLLHUP ) 	e |= EPOLLHUP;if( events & POLLERR )	e |= EPOLLERR;if( events & POLLRDNORM ) e |= EPOLLRDNORM;if( events & POLLWRNORM ) e |= EPOLLWRNORM;return e;
}
static short nty_epollevent_2poll( uint32_t events )
{short e = 0;	if( events & EPOLLIN ) 	e |= POLLIN;if( events & EPOLLOUT ) e |= POLLOUT;if( events & EPOLLHUP ) e |= POLLHUP;if( events & EPOLLERR ) e |= POLLERR;if( events & EPOLLRDNORM ) e |= POLLRDNORM;if( events & EPOLLWRNORM ) e |= POLLWRNORM;return e;
}
/** nty_poll_inner --> 1. sockfd--> epoll, 2 yield, 3. epoll x sockfd* fds : * * POLL 最出名的是其超时处理*/
//	以下的这个函数其实是发生在任何一个读任务之前的,毕竟读任务其实是一个被动技能,他会让这个协程让出给调度器,
//	并且进入睡眠状态,等待后台把事情给完成了再让这个写成醒过来
static int nty_poll_inner(struct pollfd *fds, nfds_t nfds, int timeout) {//	nfds_t 是在 <poll.h> 头文件中定义的,本质是一个无符号长整型if (timeout == 0){return poll(fds, nfds, timeout);	//	超时参数 timeout 的单位是 毫秒 (milliseconds)。}if (timeout < 0){timeout = INT_MAX;		//	意为无限阻塞}nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) {printf("scheduler not exit!\n");return -1;}nty_coroutine *co = sched->curr_thread;//	int i = 0;for (i = 0;i < nfds;i ++) {struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);		//	这个 event 将会是 POLLOUT | POLLERR | POLLHUPev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev);	co->events = fds[i].events;nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout);}nty_coroutine_yield(co); 		//	控制权交还给调度器,当然上下文也被保存了//	什么情况下这个写成才会回归呢?for (i = 0;i < nfds;i ++) {struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev);		//	这个 fd 已经把消息都读完了,之后就不需要 epoll 了nty_schedule_desched_wait(fds[i].fd);}return nfds;
}

nty_poll_inner 函数将会用在各种网络函数的封装上,保证在执行下一个 I/O 之前,先把 CPU 让出,但又把命令全部都记住了。这就好像一个学生,在考试前偷偷把考试题目背下来。比如,下面的 nty_recv 就是 NTYCO 改良后的 recv 函数

ssize_t nty_recv(int fd, void *buf, size_t len, int flags) {struct pollfd fds;fds.fd = fd;fds.events = POLLIN | POLLERR | POLLHUP;nty_poll_inner(&fds, 1, 1);		//	超时参数 timeout 的单位是 毫秒 (milliseconds)。这里说明了 recv 的超时时间是 1 毫秒(10 的负三次方)int ret = recv(fd, buf, len, flags);if (ret < 0) {//if (errno == EAGAIN) return ret;if (errno == ECONNRESET) return -1;//printf("recv error : %d, ret : %d\n", errno, ret);}return ret;
}

创建协程与协程调度器

需要清楚的是,先创建协程协程,发现没有调度器再顺便创造,也就是说协程调度器的创建是被动创建的。首先是创建协程,

// coroutine --> 
// create 
// 协程创建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);//	使用 pthread_once 确保全局调度器键值 (global_sched_key) 只初始化一次//	首次调用时触发 nty_coroutine_sched_key_creator 创建 TLS 键// 获取当前线程的调度器nty_schedule *sched = nty_coroutine_get_sched();//	惰性初始化:每个线程首次创建协程时自动创建专属调度器//	参数 0 表示使用默认栈大小(通常为 128KB)if (sched == NULL) {nty_schedule_create(0);sched = nty_coroutine_get_sched();if (sched == NULL) {printf("Failed to create scheduler\n");return -1;		// 错误码 -1: 调度器创建失败}}nty_coroutine* co = calloc(1, sizeof(nty_coroutine));if (co == NULL) {printf("Failed to allocate memory for new coroutine\n");return -2;}// 自定义汇编模式:手动分配对齐的栈内存,posix_memalign 是 POSIX 标准提供的 按指定对齐方式分配动态内存 的函数。// getpagesize() 是 POSIX 提供的系统调用/库函数,用来获取当前系统 内存页(page) 的大小(字节数)。// 内存页就是操作系统管理物理内存的最小单位,把物理内存想成一本很厚的草稿本,页就是固定大小的活页纸。int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);//	内存对齐:按页大小对齐(通常 4KB),避免 MMU 分页错误//	手动管理:后续需要显式释放 (free(co->stack))if (ret) {printf("Failed to allocate stack for new coroutine\n");free(co);return -3;}co->stack_size = sched->stack_size;			//	协程栈大小等于调度器栈的大小,128 KBco->sched = sched;							//	就像线程池与工作线程互持股份一样,是为了相互通信co->status = BIT(NTY_COROUTINE_STATUS_NEW); //	创建状态co->id = sched->spawned_coroutines ++;		//	只要创建一个协程,它的 ID 号就是当前所建立的协程数 +1co->func = func;							//	协程的任务函数co->fd = -1;								// 初始化文件描述符co->events = 0;								// 初始化事件掩码co->arg = arg;								// 保存用户参数co->birth = nty_coroutine_usec_now();		// 记录创建时间戳*new_co = co;								// 返回创建的协程指针// ── 加入调度队列(双向链表队列) ──────────────────────TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);return 0;
}

紧接着是,调度器创建

int nty_schedule_create(int stack_size) {int sched_stack_size = stack_size ? stack_size : NTY_CO_MAX_STACKSIZE;nty_schedule *sched = (nty_schedule*)calloc(1, sizeof(nty_schedule));if (sched == NULL) {printf("Failed to initialize scheduler\n");return -1;}//	pthread_setspecific(global_sched_key, sched) 是专门为调度器 sched 设置一把钥匙assert(pthread_setspecific(global_sched_key, sched) == 0);//	调度器掌握 EPOLL 的套接字管理 epfd sched->poller_fd = nty_epoller_create();if (sched->poller_fd == -1) {printf("Failed to initialize epoller\n");nty_schedule_free(sched);return -2;}nty_epoller_ev_register_trigger();			//	调度器 sched 拥有计时器sched->stack_size = sched_stack_size;		//	调度器栈的大小为 128 KBsched->page_size = getpagesize();			//	调度器的页大小为 4 KBsched->stack = NULL;bzero(&sched->ctx, sizeof(nty_cpu_ctx));	//	初始化当前协程的上下文sched->spawned_coroutines = 0;				//	已建立多少个协程sched->default_timeout = 3000000u;			//	把调度器的默认超时设成 3 秒。RB_INIT(&sched->sleeping);					//	初始化睡眠树RB_INIT(&sched->waiting);					//	初始化等待树sched->birth = nty_coroutine_usec_now();	//	调度器的创立计时TAILQ_INIT(&sched->ready);					//	就绪队列TAILQ_INIT(&sched->defer);					//	延迟队列LIST_INIT(&sched->busy);					//	忙链表}

协程的运行方式

调度器与协程之间相互协调,协程执行后让出 CPU 给调度器,调度器查看睡眠树的协程是否苏醒了,就序列表里面是否准备好了,等待树里面是否有突发情况。如果有的话,挨个让出 CPU 给这些协程。普通的单线程执行 I/O 任务就够了,如果遇上计算密集型,那得在设计计算调度器的调度函数(源代码里面并没有关于计算调度器的运行代码实例)。

void nty_schedule_run(void) {nty_schedule *sched = nty_coroutine_get_sched();if (sched == NULL) return ;//	执行协程有三个来源:睡眠树+就绪队列+等待红黑树,如此循环着来(有优先级)while (!nty_schedule_isdone(sched)) {	//	如果调度器没有启动的话// 1. expired --> sleep rbtree	第一步:处理超时协程(睡眠红黑树),就是观察预定的时间到了与否nty_coroutine *expired = NULL;while ((expired = nty_schedule_expired(sched)) != NULL) {	//	只移除那些 “成熟” 的节点nty_coroutine_resume(expired);	//	这个是在 nty_coroutine.c 里面定义的,从睡眠红黑树摘下来后,就会被拿去立即执行}// 2. ready queue	第二步:处理就绪协程队列,全部处理,所有刚生成的协程都会进入这个队列里面nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);while (!TAILQ_EMPTY(&sched->ready)) {nty_coroutine *co = TAILQ_FIRST(&sched->ready);TAILQ_REMOVE(&co->sched->ready, co, ready_next);if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {	//	协程状态为结束的话,释放协程资源nty_coroutine_free(co);break;}nty_coroutine_resume(co);		//	立即执行if (co == last_co_ready) break;}// 3. wait rbtree 等待树,这里绕了一圈,用户发送数据,EPOLL 只有可能接受 EPOLLIN 事件(不判断),从等待树里找到协程,协程再给出上下文(相当于未雨绸缪)nty_schedule_epoll(sched);			//	超时为 t 的 EPOLL 实例识别 IO 事件的出发,并且内容已经保存在了调度器 sched 上面while (sched->num_new_events) {		//	有限次循环int idx = --sched->num_new_events;struct epoll_event *ev = sched->eventlist+idx;int fd = ev->data.fd;int is_eof = ev->events & EPOLLHUP;		//	IO 文件被挂断,这是 Linux 内核负责标记的if (is_eof) errno = ECONNRESET;nty_coroutine *co = nty_schedule_search_wait(fd);if (co != NULL) {if (is_eof) {co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);	//	协程也被打上标签,执行最后这一次了}nty_coroutine_resume(co);	//	立即执行,这不是回调函数}is_eof = 0;}}nty_schedule_free(sched);return ;
}

我们观察到计算等待树的协程触发是需要 EPOLL 事件去驱动的,因为等待树意味着不确定的外界输入。

http://www.xdnf.cn/news/1375633.html

相关文章:

  • 零后端、零配置:用 AI 编程工具「Cursor」15 分钟上线「Vue3 留言墙」
  • 【双指针- LeetCode】15.三数之和
  • python自学笔记14 NumPy 线性代数
  • anaconda本身有一个python环境(base),想用别的环境就是用anaconda命令行往anaconda里创建虚拟环境
  • 前端架构知识体系:css架构模式和代码规范
  • vscode 如何调试 python 2.7
  • springboot设计开发之基于springboot的校园社团管理系统/基于java的社团管理系统
  • UTXO 模型及扩展模型
  • Android -第二十一次技术总结
  • 海康相机的 HB 模式功能详解
  • Part 1️⃣:相机几何与单视图几何-第六章:相机模型
  • 【Redis 进阶】Redis 典型应用 —— 缓存(cache)
  • 【大前端】封装一个React Native与Android/IOS 端通用的埋点接口
  • 储能站运维管理一体化平台 | 图扑数字孪生
  • 《Linux 网络编程四:TCP 并发服务器:构建模式、原理及关键技术(以select )》
  • Linux 软件编程(十二)网络编程:TCP 并发服务器构建与 IO 多路复用
  • PPT处理控件Aspose.Slides教程:在.NET中开发SVG到EMF的转换器
  • 爬虫基础学习 - Xpath
  • 设计模式与设计原则简介——及其设计模式学习方法
  • 优选算法-常见位运算总结
  • uniapp中 ios端 scroll-view 组件内部子元素z-index失效问题
  • 基于 Node.js 的淘宝 API 接口开发:快速构建异步数据采集服务
  • 汽车电气系统的发展演进为测试带来了哪些影响?
  • DeFi协议Lombard能突破比特币生态原生叙事困境吗?
  • 图表可视化地理趋势-Telerik WPF Chart
  • 【Day 35】Linux-主从复制的维护
  • (LeetCode 面试经典 150 题 ) 637. 二叉树的层平均值(深度优先搜索dfs)
  • 亚马逊广告关键词排名提升的五大核心策略解析
  • java简单ssm(spring+springmvc+mybatis)框架结构demo
  • 大模型重构建筑“能耗基因“:企业如何用物联中台打响能源革命?