AirSim/Cosys-AirSim 游戏开发(一)XBox 手柄 Windows + python 连接与读取
这个系列用来记录在开发 AirSim 应用过程中遇到的一些问题和解决方案,由于 AirSim 已经停止维护了,因此我实际的开发平台是 Cosys-AirSim,但这个 fork 在编译和部署的时候有不少坑,后续我会找机会补上。
第一篇博客实际上不需要你编译和部署 AirSim 和 Cosys-AirSim,主要是验证一下 Xbox 游戏手柄是否可用以及基本的通讯功能是否正常。
- AirSim 官方 GitHub 仓库:https://github.com/microsoft/AirSim
- Cosys-AirSim 官方 GitHub 仓库:https://github.com/Cosys-Lab/Cosys-AirSim
这篇博客涉及到的代码我都放在 GitHub 仓库中,欢迎大家 Issue Bug:
- GaohaoZhou-ops/XboxControllerReader
1. 硬件准备
我这里使用的是 Xbox 无线控制器,但连接方式使用的是 USB 连接,因为主机没有蓝牙收发器还需要额外买一个蓝牙增强模块。
- Xbox 无线控制器产品连接:https://www.xbox.com/zh-CN/accessories/controllers/xbox-wireless-controller#white
正确连接后手柄的 XBox 指示灯会常亮,如果这个灯一闪一闪的则说明没有正确连接,在Windows平台上通常会自动弹出驱动安装确认,将驱动装上即可。
2. GUI 工具测试手柄
在写代码之前建议先用一些免费工具来测试手柄各个按键是否正常,虽然网上有很多免费工具,但我自己用的惯的还是 Microsoft Stroe 里面的一个工具 Controller Tester
,可以直接在商店里面搜到:
安装完成后直接打开软件可以看到下面的画面,将手柄上的按钮全部按下,每检测到一个有效触发就会将其标绿,两个遥感和后面 LT
和 RT
本质上是线性轴:
3. pygame 测试代码
我这里使用的是 conda 管理python包,在运行代码之前需要安装以下依赖:
(airsim) $ pip install pygame
然后执行下面的代码,代码中的 read_gamepad_buttons()
函数参数通常是上面软件显示的 Controller X
后面跟着的数字,即设备索引号。
【Note】:下面的代码只能在本地运行,即便你通过 ssh 过来运行也会显示没有监测到手柄,原因好像是 windows 平台下 USB 需要映射成 IP 端口,后面我整明白了会补充到这里。
import pygame, os, timedef read_gamepad_buttons(joy_device_index:int=0):"""读取游戏手柄上所有按键值。:return: 按键状态字典"""pygame.init()pygame.joystick.init()if pygame.joystick.get_count() == 0:print("未检测到任何游戏手柄, pygame.joystick.get_count()=0")return Nonejoystick = pygame.joystick.Joystick(joy_device_index)joystick.init()button_states = {}try:while True:time.sleep(0.1)pygame.event.pump()button_states = []balls_states = []axes_states = []hat_states = []for i in range(joystick.get_numbuttons()):button_states.append(joystick.get_button(i))for i in range(joystick.get_numballs()):balls_states.append(joystick.get_ball(i))for i in range(joystick.get_numaxes()):axes_states.append(joystick.get_axis(i))for i in range(joystick.get_numhats()):hat_states.append(joystick.get_hat(i))print('-' * 50)print(f'Button {len(button_states)}: {button_states}')print(f'Balls {len(balls_states)}: {balls_states}')print(f'Axes {len(axes_states)}: {axes_states}')print(f'Hat {len(hat_states)}: {hat_states}')except KeyboardInterrupt:print("游戏手柄读取终止。")finally:pygame.quit()return button_statesif __name__ == '__main__':read_gamepad_buttons(0)
有下面的输出就说明手柄被正确连接:
--------------------------------------------------
Button 16: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Balls 0: []
Axes 6: [0.0, 0.0, 0.0, 0.0, -1.0, -1.0]
Hat 1: [(0, 0)]
--------------------------------------------------
根据自己业务需要去映射各个按键的功能。
【Note】:在 Windows 和 Linux 下部分按键的定义和索引是不同的,通常情况下代码是不通用的。如 LT
按键 RT
在 Windows 下被定义为 Axes
,但在 Linux 下是 Button
。
4. 类封装
为了方便自己和大家使用,我将上面的代码进行了封装,由于我个人通常需要在异步场景下获取手柄状态,因此这里的封装分为两种形式:异步 & 同步。
4.1 异步封装 XBoxControllerReaderAsync
import pygame
import asyncio
from typing import List, Tupleclass XBoxControllerReaderAsync:def __init__(self, joy_device_index: int = 0, poll_interval: float = 0.05):self.joy_device_index = joy_device_indexself.poll_interval = poll_intervalself.button_states: List[int] = []self.ball_states: List[Tuple[int, int]] = []self.axis_states: List[float] = []self.hat_states: List[Tuple[int, int]] = []self._running = Falseself._task = Noneself._joystick = Noneasync def start(self):pygame.init()pygame.joystick.init()if pygame.joystick.get_count() == 0:raise RuntimeError("未检测到任何游戏手柄,pygame.joystick.get_count()=0")self._joystick = pygame.joystick.Joystick(self.joy_device_index)self._joystick.init()self._running = Trueself._task = asyncio.create_task(self._poll_loop())async def stop(self):self._running = Falseif self._task:await self._taskpygame.quit()async def _poll_loop(self):while self._running:pygame.event.pump() # 处理事件队列self.button_states = [self._joystick.get_button(i) for i in range(self._joystick.get_numbuttons())]self.ball_states = [self._joystick.get_ball(i) for i in range(self._joystick.get_numballs())]self.axis_states = [self._joystick.get_axis(i) for i in range(self._joystick.get_numaxes())]self.hat_states = [self._joystick.get_hat(i) for i in range(self._joystick.get_numhats())]await asyncio.sleep(self.poll_interval)def get_button_states(self) -> List[int]:return self.button_statesdef get_axis_states(self) -> List[float]:return self.axis_statesdef get_ball_states(self) -> List[Tuple[int, int]]:return self.ball_statesdef get_hat_states(self) -> List[Tuple[int, int]]:return self.hat_states
运行下面的代码进行测试:
async def main():reader = AsyncGamepadReader()await reader.start()try:for _ in range(100):print("Buttons:", reader.get_button_states())print("Axes: ", reader.get_axis_states())print("Hats: ", reader.get_hat_states())print("-" * 40)await asyncio.sleep(0.1)finally:await reader.stop()asyncio.run(main())
4.2 同步封装 XBoxControllerReaderSync
import pygame
import threading
import time
from typing import List, Tupleclass XBoxControllerReaderSync:def __init__(self, joy_device_index: int = 0, poll_interval: float = 0.05):self.joy_device_index = joy_device_indexself.poll_interval = poll_intervalself.button_states: List[int] = []self.ball_states: List[Tuple[int, int]] = []self.axis_states: List[float] = []self.hat_states: List[Tuple[int, int]] = []self._running = Falseself._thread = Noneself._joystick = Nonedef start(self):self._running = Trueself._thread = threading.Thread(target=self._poll_loop, daemon=True)self._thread.start()def stop(self):self._running = Falseif self._thread:self._thread.join()pygame.quit()print("GamepadReader stopped and pygame quit.")def _poll_loop(self):print("Initializing pygame...")pygame.init()pygame.joystick.init()count = pygame.joystick.get_count()print(f"Detected {count} joystick(s)")if count == 0:raise RuntimeError("未检测到任何游戏手柄,pygame.joystick.get_count()=0")self._joystick = pygame.joystick.Joystick(self.joy_device_index)self._joystick.init()print(f"Joystick {self._joystick.get_name()} initialized.")while self._running:try:pygame.event.pump()self.button_states = [self._joystick.get_button(i)for i in range(self._joystick.get_numbuttons())]self.ball_states = [self._joystick.get_ball(i)for i in range(self._joystick.get_numballs())]self.axis_states = [self._joystick.get_axis(i)for i in range(self._joystick.get_numaxes())]self.hat_states = [self._joystick.get_hat(i)for i in range(self._joystick.get_numhats())]except pygame.error as e:print(f"Pygame error during polling: {e}")time.sleep(self.poll_interval)def get_button_states(self) -> List[int]:return self.button_statesdef get_axis_states(self) -> List[float]:return self.axis_statesdef get_ball_states(self) -> List[Tuple[int, int]]:return self.ball_statesdef get_hat_states(self) -> List[Tuple[int, int]]:return self.hat_states
运行下面的代码测试:
if __name__ == "__main__":reader = GamepadReader()try:reader.start()for _ in range(100):print("Buttons:", reader.get_button_states())print("Axes: ", reader.get_axis_states())print("Hats: ", reader.get_hat_states())print("-" * 40)time.sleep(0.1)finally:reader.stop()
4.3 同时测试
在运行下面的测试代码前需要确保你的文件结构如下,其中 xbox_controller_async.py
存放异步封装类代码; xbox_controller_sync.py
存放同步封装类代码:
Mode LastWriteTime Length Name
---- ------------- ------ ----
d----- 6/4/2025 1:38 PM __pycache__
-a---- 6/4/2025 1:18 PM 1656 test.py
-a---- 6/4/2025 1:42 PM 2213 xbox_controller_async.py
-a---- 6/4/2025 1:41 PM 2731 xbox_controller_sync.py
然后运行 test.py
的代码,这里会先测试异步后测试同步:
import asyncio
from xbox_controller_async import XBoxControllerReaderAsync
from xbox_controller_sync import XBoxControllerReaderSync
import asyncio
import timeasync def test_async_reader(duration=5):print("=== 异步读取开始 ===")reader = XBoxControllerReaderAsync()await reader.start()start = time.time()try:while time.time() - start < duration:print("Async - Buttons:", reader.get_button_states())print("Async - Axes: ", reader.get_axis_states())print("Async - Hats: ", reader.get_hat_states())print("-" * 40)await asyncio.sleep(0.1)finally:await reader.stop()print("=== 异步读取结束 ===\n")def test_sync_reader(duration=5):print("=== 同步读取开始 ===")reader = XBoxControllerReaderSync()reader.start()start = time.time()try:while time.time() - start < duration:print("Sync - Buttons:", reader.get_button_states())print("Sync - Axes: ", reader.get_axis_states())print("Sync - Hats: ", reader.get_hat_states())print("-" * 40)time.sleep(0.1)finally:reader.stop()print("=== 同步读取结束 ===")async def main():await test_async_reader(duration=5)await asyncio.sleep(1)test_sync_reader(duration=5)if __name__ == "__main__":asyncio.run(main())