当前位置: 首页 > ds >正文

AirSim/Cosys-AirSim 游戏开发(一)XBox 手柄 Windows + python 连接与读取

这个系列用来记录在开发 AirSim 应用过程中遇到的一些问题和解决方案,由于 AirSim 已经停止维护了,因此我实际的开发平台是 Cosys-AirSim,但这个 fork 在编译和部署的时候有不少坑,后续我会找机会补上。

第一篇博客实际上不需要你编译和部署 AirSim 和 Cosys-AirSim,主要是验证一下 Xbox 游戏手柄是否可用以及基本的通讯功能是否正常。

  • AirSim 官方 GitHub 仓库:https://github.com/microsoft/AirSim
  • Cosys-AirSim 官方 GitHub 仓库:https://github.com/Cosys-Lab/Cosys-AirSim

这篇博客涉及到的代码我都放在 GitHub 仓库中,欢迎大家 Issue Bug:

  • GaohaoZhou-ops/XboxControllerReader

1. 硬件准备

我这里使用的是 Xbox 无线控制器,但连接方式使用的是 USB 连接,因为主机没有蓝牙收发器还需要额外买一个蓝牙增强模块。

  • Xbox 无线控制器产品连接:https://www.xbox.com/zh-CN/accessories/controllers/xbox-wireless-controller#white

在这里插入图片描述

正确连接后手柄的 XBox 指示灯会常亮,如果这个灯一闪一闪的则说明没有正确连接,在Windows平台上通常会自动弹出驱动安装确认,将驱动装上即可。


2. GUI 工具测试手柄

在写代码之前建议先用一些免费工具来测试手柄各个按键是否正常,虽然网上有很多免费工具,但我自己用的惯的还是 Microsoft Stroe 里面的一个工具 Controller Tester,可以直接在商店里面搜到:

在这里插入图片描述

安装完成后直接打开软件可以看到下面的画面,将手柄上的按钮全部按下,每检测到一个有效触发就会将其标绿,两个遥感和后面 LTRT 本质上是线性轴:

在这里插入图片描述


3. pygame 测试代码

我这里使用的是 conda 管理python包,在运行代码之前需要安装以下依赖:

(airsim) $ pip install pygame

然后执行下面的代码,代码中的 read_gamepad_buttons() 函数参数通常是上面软件显示的 Controller X 后面跟着的数字,即设备索引号。

【Note】:下面的代码只能在本地运行,即便你通过 ssh 过来运行也会显示没有监测到手柄,原因好像是 windows 平台下 USB 需要映射成 IP 端口,后面我整明白了会补充到这里。

import pygame, os, timedef read_gamepad_buttons(joy_device_index:int=0):"""读取游戏手柄上所有按键值。:return: 按键状态字典"""pygame.init()pygame.joystick.init()if pygame.joystick.get_count() == 0:print("未检测到任何游戏手柄, pygame.joystick.get_count()=0")return Nonejoystick = pygame.joystick.Joystick(joy_device_index)joystick.init()button_states = {}try:while True:time.sleep(0.1)pygame.event.pump()button_states = []balls_states = []axes_states = []hat_states = []for i in range(joystick.get_numbuttons()):button_states.append(joystick.get_button(i))for i in range(joystick.get_numballs()):balls_states.append(joystick.get_ball(i))for i in range(joystick.get_numaxes()):axes_states.append(joystick.get_axis(i))for i in range(joystick.get_numhats()):hat_states.append(joystick.get_hat(i))print('-' * 50)print(f'Button {len(button_states)}: {button_states}')print(f'Balls  {len(balls_states)}: {balls_states}')print(f'Axes   {len(axes_states)}: {axes_states}')print(f'Hat    {len(hat_states)}: {hat_states}')except KeyboardInterrupt:print("游戏手柄读取终止。")finally:pygame.quit()return button_statesif __name__ == '__main__':read_gamepad_buttons(0)

有下面的输出就说明手柄被正确连接:

--------------------------------------------------
Button 16: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Balls  0: []
Axes   6: [0.0, 0.0, 0.0, 0.0, -1.0, -1.0]
Hat    1: [(0, 0)]
--------------------------------------------------

根据自己业务需要去映射各个按键的功能。

【Note】:在 Windows 和 Linux 下部分按键的定义和索引是不同的,通常情况下代码是不通用的。如 LT 按键 RT 在 Windows 下被定义为 Axes ,但在 Linux 下是 Button


4. 类封装

为了方便自己和大家使用,我将上面的代码进行了封装,由于我个人通常需要在异步场景下获取手柄状态,因此这里的封装分为两种形式:异步 & 同步。

4.1 异步封装 XBoxControllerReaderAsync

import pygame
import asyncio
from typing import List, Tupleclass XBoxControllerReaderAsync:def __init__(self, joy_device_index: int = 0, poll_interval: float = 0.05):self.joy_device_index = joy_device_indexself.poll_interval = poll_intervalself.button_states: List[int] = []self.ball_states: List[Tuple[int, int]] = []self.axis_states: List[float] = []self.hat_states: List[Tuple[int, int]] = []self._running = Falseself._task = Noneself._joystick = Noneasync def start(self):pygame.init()pygame.joystick.init()if pygame.joystick.get_count() == 0:raise RuntimeError("未检测到任何游戏手柄,pygame.joystick.get_count()=0")self._joystick = pygame.joystick.Joystick(self.joy_device_index)self._joystick.init()self._running = Trueself._task = asyncio.create_task(self._poll_loop())async def stop(self):self._running = Falseif self._task:await self._taskpygame.quit()async def _poll_loop(self):while self._running:pygame.event.pump()  # 处理事件队列self.button_states = [self._joystick.get_button(i) for i in range(self._joystick.get_numbuttons())]self.ball_states = [self._joystick.get_ball(i) for i in range(self._joystick.get_numballs())]self.axis_states = [self._joystick.get_axis(i) for i in range(self._joystick.get_numaxes())]self.hat_states = [self._joystick.get_hat(i) for i in range(self._joystick.get_numhats())]await asyncio.sleep(self.poll_interval)def get_button_states(self) -> List[int]:return self.button_statesdef get_axis_states(self) -> List[float]:return self.axis_statesdef get_ball_states(self) -> List[Tuple[int, int]]:return self.ball_statesdef get_hat_states(self) -> List[Tuple[int, int]]:return self.hat_states

运行下面的代码进行测试:

async def main():reader = AsyncGamepadReader()await reader.start()try:for _ in range(100):print("Buttons:", reader.get_button_states())print("Axes:   ", reader.get_axis_states())print("Hats:   ", reader.get_hat_states())print("-" * 40)await asyncio.sleep(0.1)finally:await reader.stop()asyncio.run(main())

4.2 同步封装 XBoxControllerReaderSync

import pygame
import threading
import time
from typing import List, Tupleclass XBoxControllerReaderSync:def __init__(self, joy_device_index: int = 0, poll_interval: float = 0.05):self.joy_device_index = joy_device_indexself.poll_interval = poll_intervalself.button_states: List[int] = []self.ball_states: List[Tuple[int, int]] = []self.axis_states: List[float] = []self.hat_states: List[Tuple[int, int]] = []self._running = Falseself._thread = Noneself._joystick = Nonedef start(self):self._running = Trueself._thread = threading.Thread(target=self._poll_loop, daemon=True)self._thread.start()def stop(self):self._running = Falseif self._thread:self._thread.join()pygame.quit()print("GamepadReader stopped and pygame quit.")def _poll_loop(self):print("Initializing pygame...")pygame.init()pygame.joystick.init()count = pygame.joystick.get_count()print(f"Detected {count} joystick(s)")if count == 0:raise RuntimeError("未检测到任何游戏手柄,pygame.joystick.get_count()=0")self._joystick = pygame.joystick.Joystick(self.joy_device_index)self._joystick.init()print(f"Joystick {self._joystick.get_name()} initialized.")while self._running:try:pygame.event.pump()self.button_states = [self._joystick.get_button(i)for i in range(self._joystick.get_numbuttons())]self.ball_states = [self._joystick.get_ball(i)for i in range(self._joystick.get_numballs())]self.axis_states = [self._joystick.get_axis(i)for i in range(self._joystick.get_numaxes())]self.hat_states = [self._joystick.get_hat(i)for i in range(self._joystick.get_numhats())]except pygame.error as e:print(f"Pygame error during polling: {e}")time.sleep(self.poll_interval)def get_button_states(self) -> List[int]:return self.button_statesdef get_axis_states(self) -> List[float]:return self.axis_statesdef get_ball_states(self) -> List[Tuple[int, int]]:return self.ball_statesdef get_hat_states(self) -> List[Tuple[int, int]]:return self.hat_states

运行下面的代码测试:

if __name__ == "__main__":reader = GamepadReader()try:reader.start()for _ in range(100):print("Buttons:", reader.get_button_states())print("Axes:   ", reader.get_axis_states())print("Hats:   ", reader.get_hat_states())print("-" * 40)time.sleep(0.1)finally:reader.stop()

4.3 同时测试

在运行下面的测试代码前需要确保你的文件结构如下,其中 xbox_controller_async.py 存放异步封装类代码; xbox_controller_sync.py 存放同步封装类代码:

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
d-----          6/4/2025   1:38 PM                __pycache__
-a----          6/4/2025   1:18 PM           1656 test.py
-a----          6/4/2025   1:42 PM           2213 xbox_controller_async.py
-a----          6/4/2025   1:41 PM           2731 xbox_controller_sync.py

然后运行 test.py 的代码,这里会先测试异步后测试同步:

import asyncio
from xbox_controller_async import XBoxControllerReaderAsync
from xbox_controller_sync import XBoxControllerReaderSync
import asyncio
import timeasync def test_async_reader(duration=5):print("=== 异步读取开始 ===")reader = XBoxControllerReaderAsync()await reader.start()start = time.time()try:while time.time() - start < duration:print("Async - Buttons:", reader.get_button_states())print("Async - Axes:   ", reader.get_axis_states())print("Async - Hats:   ", reader.get_hat_states())print("-" * 40)await asyncio.sleep(0.1)finally:await reader.stop()print("=== 异步读取结束 ===\n")def test_sync_reader(duration=5):print("=== 同步读取开始 ===")reader = XBoxControllerReaderSync()reader.start()start = time.time()try:while time.time() - start < duration:print("Sync - Buttons:", reader.get_button_states())print("Sync - Axes:   ", reader.get_axis_states())print("Sync - Hats:   ", reader.get_hat_states())print("-" * 40)time.sleep(0.1)finally:reader.stop()print("=== 同步读取结束 ===")async def main():await test_async_reader(duration=5)await asyncio.sleep(1)test_sync_reader(duration=5)if __name__ == "__main__":asyncio.run(main())
http://www.xdnf.cn/news/12204.html

相关文章:

  • 估计二维结构的数量
  • 尝试使用gocryptfs实现大模型加密部署
  • AI书签管理工具开发全记录(十):命令行中结合ai高效添加书签
  • Vue指令修饰符、v-bind对样式控制的增强、computed计算属性、watch监视器
  • 【c++】STL-string容器的使用
  • 第九届御网杯做题笔记(misc和web)(部分题其他的要么不会要么可以用gpt可以秒)
  • redis进入后台操作、查看key、删除key
  • PostgreSQL-基于PgSQL17和11版本导出所有的超表建表语句
  • JavaScript中判断两个对象是否相同(所有属性的值是否都相同)
  • JavaWeb简介
  • Ansible常用模块和使用技巧
  • 学习笔记(23): 机器学习之数据预处理Pandas和转换成张量格式[1]
  • 前端css外边距塌陷(Margin Collapse)现象原因和解决方法
  • 【DAY39】图像数据与显存
  • Java 中创建线程主要有三种方式
  • Fast-dLLM:为扩散大模型按下加速键
  • 关于项目多语言化任务的概述
  • Manus AI 现在可以生成短片了
  • 电镀机的阳极是什么材质?
  • Windows系统下npm报错node-gyp configure got “gyp ERR“解决方法
  • 道可云人工智能每日资讯|人工智能赋能广西生态环境保护计划发布
  • JavaWeb:前端工程化-TS(TypeScript)
  • 鸿蒙任务项设置案例实战
  • 离散化思想
  • 链路聚合+VRRP
  • python入门(1)
  • 【.net core】【watercloud】树形组件combotree导入及调用
  • Visual Studio C++ 调试日志与异常定位指南
  • 时序替换实时?是否必要
  • 第16届蓝桥STEMA真题剖析-2025年4月13日Scratch初/中级组