当前位置: 首页 > news >正文

Rust Async 异步编程(五):执行器和系统 I/O

Rust Async 异步编程(五):执行器和系统 I/O

  • Rust Async 异步编程(五):执行器和系统 I/O

Rust Async 异步编程(五):执行器和系统 I/O

前面我们一起看过一个使用 Future 从 Socket 中异步读取数据的例子:

pub struct SocketRead<'a> {socket: &'a Socket,
}impl SimpleFuture for SocketRead<'_> {type Output = Vec<u8>;fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if self.socket.has_data_to_read() {// socket 有数据,写入 buffer 中并返回Poll::Ready(self.socket.read_buf())} else { // socket 中还没数据// 注册一个 wake 函数,当数据可用时,该函数会被调用,// 然后当前 Future 的执行器会再次调用 poll 方法,此时就可以读取到数据self.socket.set_readable_callback(wake);Poll::Pending}}
}

该例子中,Future 将从 Socket 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 Future。

当数据准备好后,会调用 wake() 函数将该 Future 的任务放入任务通道中,等待执行器的 poll。

关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:set_readable_callback 方法到底是怎么工作的?怎么才能知道 socket 中的数据已经可以被读取了?

在现实世界中,该问题往往是通过操作系统提供的 I/O 多路复用机制来完成,例如 Linux 中的 epoll,FreeBSD 和 macOS 中的 kqueue,Windows 中的 IOCP,Fuchisa中的 ports 等(可以通过 Rust 的跨平台包 mio 来使用它们)。借助 I/O 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 I/O 事件,一旦某个事件完成就立即退出阻塞并返回数据。

相关实现类似于以下代码:

struct IoBlocker { /* ... */ }struct Event {// Event 的唯一 ID,该事件发生后,就会被监听起来id: usize,// 一组需要等待或者已发生的信号signals: Signals,
}impl IoBlocker {/// 创建需要阻塞等待的异步 I/O 事件的集合fn new() -> Self { /* ... */ }/// 对指定的 I/O 事件表示兴趣fn add_io_event_interest(&self,/// 事件所绑定的 socketio_object: &IoObject,event: Event,) { /* ... */ }/// 进入阻塞,直到某个事件出现fn block(&self) -> Event { /* ... */ }
}let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(&socket_1,Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(&socket_2,Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();// 当 socket 的数据可以读取时,打印 "Socket 1 is now READABLE" 
println!("Socket {:?} is now {:?}", event.id, event.signals);

Future 执行者可以使用这些 I/O 多路复用机制提供异步 I/O 对象,例如 socket,它就可以当特定 IO 事件发生时通过配置回调来运行

针对我们 SocketRead 例子,Socket::set_readable_callback 的伪代码大致如下:

impl Socket {fn set_readable_callback(&self, waker: Waker) {let local_executor = self.local_executor;let id = self.id;local_executor.event_map.insert(id, waker);local_executor.add_io_event_interest(&self.socket_file_descriptor,Event { id, signals: READABLE },);}
}

现在,我们就只有一个执行者线程,它可以接收 I/O 事件,并将它们分配到适合的 Waker,这将唤醒相应的任务,并允许执行者在返回检查更多的 I/O 事件之前,驱动更多的任务完成(循环继续…)。

这样,我们只需要一个执行器线程,它会接收 I/O 事件并将其分发到对应的 Waker 中,接着后者会唤醒相关的任务,最终通过执行器 poll 后,任务可以顺利的继续执行,这种 I/O 读取流程可以不停的循环,直到 socket 关闭。

参考:

  1. https://github.com/rustcn-org/async-book
  2. https://www.bilibili.com/video/BV1Ki4y1C7gj
http://www.xdnf.cn/news/1319851.html

相关文章:

  • Spring 创建 Bean 的 8 种主要方式
  • MXFP4量化:如何在80GB GPU上运行1200亿参数的GPT-OSS模型
  • 【SpringBoot】Swagger 接口工具
  • 如何在Windows系统中更改用户名(中文转英文全流程)
  • 云原生俱乐部-RH134知识点总结(2)
  • MySQL数据库备份与恢复
  • neo4j导入导出方法
  • 25年第十本【金钱心理学】
  • 半敏捷卫星观测调度系统的设计与实现
  • 《WINDOWS 环境下32位汇编语言程序设计》第3章 使用MASM
  • Effective C++ 条款46:需要类型转换时请为模板定义非成员函数
  • Critic-V: VLM Critics Help Catch VLM Errors in Multimodal Reasoning(CVPR 2025)
  • 飞算AI 3.2.0实战评测:10分钟搭建企业级RBAC权限系统
  • 【牛客刷题】求四个数的最小公约数:两种高效解法详解(枚举法和最大公约数法)
  • 华为云之Linux系统安装部署Tomcat服务器
  • 【技术博客】480p 老番 → 8K 壁纸:APISR × SUPIR × CCSR「多重高清放大」完全指南
  • YoloV9改进策略:Block改进-DCAFE,并行双坐标注意力机制,增强长程依赖与抗噪性-即插即用
  • 【Golang】:函数和包
  • HTTPS 配置与动态 Web 内容部署指南
  • 数组实现各类数据结构
  • 创建工作空间与功能包
  • nodejs 中间件
  • 科目二的四个电路
  • Windows运维之以一种访问权限不允许的方式做了一个访问套接字的尝试
  • 健身房预约系统SSM+Mybatis实现(三、校验 +页面完善+头像上传)
  • es7.17.x es服务yellow状态的排查查看节点,分片状态数量
  • 生成模型实战 | InfoGAN详解与实现
  • 1. Docker的介绍和安装
  • 安装pytorch3d后报和本机cuda不符
  • gitee 流水线+docker-compose部署 nodejs服务+mysql+redis