esp32 挂载mpu6050实现加速度计
使用Nordic uart service传输数据给app。
mpu6050.py
import _thread
import time
import uasyncio as asyncio
from machine import Pin, I2C, PWM
import math# 定义I2C接口 - 尝试使用不同的引脚组合
i2c = I2C(0, scl=Pin(0), sda=Pin(1), freq=400000) # 常用的I2C引脚组合# MPU6050的I2C地址
MPU6050_ADDR = 0x68
# MPU6050寄存器地址
PWR_MGMT_1 = 0x6B
ACCEL_XOUT_H = 0x3B
GYRO_XOUT_H = 0x43
WHO_AM_I = 0x75# 添加I2C设备扫描函数
def scan_i2c_devices():print("scan I2C bus devices...")devices = i2c.scan()if len(devices) == 0:print("not find I2C device!")#print("请检查:")#print("1. MPU6050传感器是否正确连接")#print("2. I2C引脚配置是否正确")#print("3. 传感器电源是否正常")return Falseelse:print(f"find {len(devices)} I2C device")for device in devices:print(f"device addr: 0x{device:02X}")if MPU6050_ADDR in devices:print(f"find one MPU6050, it's addr:0x{MPU6050_ADDR:02X}")return Trueelse:print(f"can not find MPU6050, addr:0x{MPU6050_ADDR:02X}")return Falsedef mpu6050_init():try:# 先扫描I2C总线,确认设备存在if not scan_i2c_devices():return False# 唤醒MPU6050(默认处于睡眠模式)print("尝试唤醒MPU6050...")i2c.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00')time.sleep_ms(100)# 读取并验证设备IDdevice_id = i2c.readfrom_mem(MPU6050_ADDR, WHO_AM_I, 1)[0]if device_id == 0x68:print(f"MPU6050设备ID验证通过: 0x{device_id:02X}")else:print(f"设备ID不匹配!预期0x68,实际返回0x{device_id:02X}")print("这可能表示:")print("1. 传感器不是MPU6050")print("2. 传感器损坏")print("3. I2C通信受到干扰")return False# 配置采样率和滤波器i2c.writeto_mem(MPU6050_ADDR, 0x19, b'\x07') # SMPLRT_DIV寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1A, b'\x00') # CONFIG寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1B, b'\x00') # GYRO_CONFIG寄存器i2c.writeto_mem(MPU6050_ADDR, 0x1C, b'\x00') # ACCEL_CONFIG寄存器 (±2g量程)print("MPU6050初始化成功!")return Trueexcept OSError as e:print(f"初始化MPU6050时出错: {e}")print("这通常表示:")print("1. I2C通信失败")print("2. 传感器未正确连接")print("3. 传感器地址不正确")return Falsedef read_word(adr):high = i2c.readfrom_mem(MPU6050_ADDR, adr, 1)[0]low = i2c.readfrom_mem(MPU6050_ADDR, adr + 1, 1)[0]val = (high << 8) + lowreturn valdef read_word_2c(adr):val = read_word(adr)if (val >= 0x8000):return -((65535 - val) + 1)else:return valdef read_accel_x():return read_word_2c(ACCEL_XOUT_H)def read_accel_y():return read_word_2c(ACCEL_XOUT_H + 2)def read_accel_z():return read_word_2c(ACCEL_XOUT_H + 4)def read_gyro_x():return read_word_2c(GYRO_XOUT_H)def read_gyro_y():return read_word_2c(GYRO_XOUT_H + 2)def read_gyro_z():return read_word_2c(GYRO_XOUT_H + 4)def calculate_angles():while True:ax = read_accel_x()ay = read_accel_y()az = read_accel_z()# 计算俯仰角(Pitch)pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi# 计算翻滚角(Roll)roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.piprint(f"Pitch: {pitch:.2f}°, Roll: {roll:.2f}°")time.sleep(0.1)def pwm_thread():# 创建LED控制对象led = PWM(Pin(12), freq=1000)while True:# 渐亮for i in range(0, 1024):led.duty(i)time.sleep_ms(1)# 渐暗for i in range(1023, 0, -1):led.duty(i)time.sleep_ms(1)# 新增:将加速度值打包为字节数组(大端模式)
def pack_accel_data(ax, ay, az):"""将加速度值(short类型)打包为6字节大端格式字节数组"""data = bytearray(6)data[0] = (ax >> 8) & 0xFF # X轴高字节data[1] = ax & 0xFF # X轴低字节data[2] = (ay >> 8) & 0xFF # Y轴高字节data[3] = ay & 0xFF # Y轴低字节data[4] = (az >> 8) & 0xFF # Z轴高字节data[5] = az & 0xFF # Z轴低字节return datadef register_speed_ble_callback(callback):global _ble_callback_ble_callback = callbackasync def acc_read():# 初始化MPU6050if not mpu6050_init():print("MPU6050 init fail, exit process")returnelse:while True:try:ax = read_accel_x()ay = read_accel_y()az = read_accel_z()# 计算俯仰角(Pitch)pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi# 计算翻滚角(Roll)roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.piprint(f"Pitch: {pitch:.2f}, Roll: {roll:.2f}")# 打包并发送加速度数据if _ble_callback:accel_data = pack_accel_data(ax, ay, az)_ble_callback(accel_data)#print(f"发送加速度: X={ax}, Y={ay}, Z={az}")else:print("BLE callback not registered, data not sent")await asyncio.sleep(0.1)except Exception as e:print(f"读取传感器数据失败: {e}")await asyncio.sleep(1)if __name__ == "__main__":print('mpu6050 main')asyncio.run(acc_read())
main.py
import uasyncio as asyncio
import machine
import ubinascii
import time
from ble_advertising import advertising_payload
import mpu6050
# import at24c02
# import CD4067BMT
from micropython import const
import bluetooth
import _thread# 定义LED引脚
led = machine.PWM(machine.Pin(12))
led.freq(1000)# 蓝牙相关常量
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)# 服务和特征的UUID
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_UART_RX = (bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),bluetooth.FLAG_WRITE,
)
_UART_SERVICE = (_UART_UUID,(_UART_TX, _UART_RX),
)# 设备名称服务UUID和特征
_SPEED_AND_CADENCE_SERVICE_UUID = bluetooth.UUID("00001814-0000-1000-8000-00805f9b34fb") # speedAndCadence Access Service
_SPEED_AND_CADENCE_CHAR_UUID = bluetooth.UUID("00002a67-0000-1000-8000-00805f9b34fb") # speedAndCadence Characteristic_SPEED_AND_CADENCE_CHAR = (_SPEED_AND_CADENCE_CHAR_UUID,bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, # 允许通知
)_SPEED_AND_CADENCE_SERVICE = (_SPEED_AND_CADENCE_SERVICE_UUID,(_SPEED_AND_CADENCE_CHAR,),
)# 全局标志
pwm_running = False
program_running = True# 呼吸灯效果函数
def breathing_led():global pwm_runningwhile pwm_running:# 亮度从0到1023逐渐增加print('亮度从 0 到 1023 逐渐增加')for i in range(0, 1024):if not pwm_running:breakled.duty(i)time.sleep_ms(1)# 亮度从1023到0逐渐减小print('亮度从 1023 到 0 逐渐减小')for i in range(1023, -1, -1):if not pwm_running:breakled.duty(i)time.sleep_ms(1)# 蓝牙类
class BLEUART:def __init__(self, ble, name="mpy-uart"):self._ble = bleself._ble.active(True)self._ble.irq(self._irq)# 处理 MAC 地址mac_tuple = ble.config("mac") # 获取 (addr_type, addr_bytes)print(mac_tuple)mac_bytes = mac_tuple[1] # 提取字节部分(6字节)mac_str = ubinascii.hexlify(mac_bytes).decode().upper() # 转换为十六进制字符串short_mac = mac_str[-6:] # 取后6位(如 "FFDC22")self.current_device_name = f"F-{short_mac}" # 生成唯一名称:"left-FFDC22"#self.current_device_name = name# ble.gap_set_name(f"right-{short_mac}") # 示例名称:"right-63E1D5"# 休眠/唤醒self._last_connection_time = time.ticks_ms() # 记录最后连接时间self._wakeup_pin = machine.Pin(6, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP) # 配置唤醒引脚self._wakeup_pin.irq(trigger=machine.Pin.IRQ_FALLING, handler=self._wakeup_handler) # 下降沿触发唤醒self._is_sleeping = False# 注册UART服务和加速度服务((self._handle_tx, self._handle_rx), (self._handle_accel,)) = self._ble.gatts_register_services((_UART_SERVICE, _SPEED_AND_CADENCE_SERVICE))self._is_connected = Falseself._connections = set()self._write_callback = Noneself._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])print(f"Advertising data length: {len(self._payload)} bytes") # 应 ≤ 31字节self._start_advertising()def _wakeup_handler(self, pin):"""唤醒处理函数"""print("Wakeup key(gpio0) triggered")if self._is_sleeping:print("Wakeup detected from GPIO5")self._is_sleeping = False# 停止呼吸灯线程(需全局标志控制)global ledglobal pwm_runningpwm_running = False# 重新初始化LEDled.deinit() # 先释放旧资源led = machine.PWM(machine.Pin(12))led.freq(1000)# 恢复广播self._start_advertising()self._last_connection_time = time.ticks_ms() # 重置连接时间戳def _enter_sleep(self):"""进入休眠模式"""print("Entering sleep mode...")self._stop_advertising() # 停止广播self._is_sleeping = True# 关闭非必要外设(示例:关闭LED)led.deinit()# 进入深度睡眠,等待唤醒machine.deepsleep()def _check_sleep_condition(self):"""检查是否满足休眠条件:10分钟无连接"""if self._is_connected:return # 连接中不休眠current_time = time.ticks_ms()elapsed_time = time.ticks_diff(current_time, self._last_connection_time)if elapsed_time >= 600000: # 10分钟(600000毫秒)self._enter_sleep()def _irq(self, event, data):if event == _IRQ_CENTRAL_CONNECT:conn_handle, _, _ = dataprint("New connection", conn_handle)self._connections.add(conn_handle)self._is_connected = Trueself._stop_advertising()self._last_connection_time = time.ticks_ms() # 更新最后连接时间elif event == _IRQ_CENTRAL_DISCONNECT:conn_handle, _, _ = dataprint("Disconnected", conn_handle)self._connections.remove(conn_handle)self._is_connected = Falseself._start_advertising()self._last_connection_time = time.ticks_ms() # 断开时刷新时间戳elif event == _IRQ_GATTS_WRITE:conn_handle, value_handle = dataif value_handle == self._handle_rx:data = self._ble.gatts_read(self._handle_rx)if self._write_callback:self._write_callback(data)def send(self, data):# 创建连接句柄的副本,避免在迭代过程中修改原始集合connections = list(self._connections)for conn_handle in connections:try:self._ble.gatts_notify(conn_handle, self._handle_tx, data)except Exception as e:print(f"Notify failed for conn_handle {conn_handle}: {e}")# 从连接集合中移除无效的句柄if conn_handle in self._connections:self._connections.remove(conn_handle)# 如果没有有效连接了,更新连接状态if not self._connections:self._is_connected = Falseself._start_advertising()# 新增:发送加速度数据的方法def send_acceleration(self, data):"""通过加速度特征值发送数据"""connections = list(self._connections)for conn_handle in connections:try:self._ble.gatts_notify(conn_handle, self._handle_accel, data)except Exception as e:print(f"加速度数据发送失败: {e}")if conn_handle in self._connections:self._connections.remove(conn_handle)if not self._connections:self._is_connected = Falseself._start_advertising()def is_connected(self):return len(self._connections) > 0def _start_advertising(self, interval_us=500000):"""开始广播"""if not self._is_sleeping:print(f"Starting advertising with name: {self.current_device_name}")self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])self._ble.gap_advertise(interval_us, adv_data=self._payload)else:print("Still sleeping, skip advertising")def _stop_advertising(self):"""停止广播"""print("Stopping advertising")self._ble.gap_advertise(None)def on_write(self, callback):self._write_callback = callbackdef _change_device_name(self, new_name):# 限制名称长度if len(new_name) > 20:new_name = new_name[:20]self.current_device_name = new_name# 如果没有连接,重新开始广播使用新名称if not self.is_connected():self._start_advertising()print(f"Device name changed to: {new_name}")# 定义数据发送函数
def send_acceleration_data_via_ble(data):if uart.is_connected():uart.send_acceleration(data)else:print("BLE not connected, data not sent")# 新增全局共享标志(控制扫描线程启停)
flag_scan = True # 默认开启扫描
scan_lock = _thread.allocate_lock() # 添加锁保证线程安全# 蓝牙控制回调函数
def ble_callback(data):global pwm_running, program_running, flag_scancommand = data.decode().strip().lower()print(f"Received command: {command}")if command == 'on':with scan_lock:flag_scan = Trueif not pwm_running:pwm_running = True#_thread.start_new_thread(breathing_led, ())elif command == 'off':with scan_lock:flag_scan = Falseif pwm_running:pwm_running = Falseled.duty(0)elif command == 'stop':program_running = False# 初始化蓝牙
ble = bluetooth.BLE()
uart = BLEUART(ble)
uart.on_write(ble_callback)# 主循环
#while program_running:
# # if not uart.is_connected() and not uart._is_sleeping:
# # uart._check_sleep_condition() # 主循环中定期检查休眠条件
# time.sleep(1)async def peripheral_task():"""蓝牙外设任务,处理广告和连接"""while True:print("Peripheral task running...")await asyncio.sleep(1)async def central_task():"""蓝牙中心任务,处理扫描和连接"""while True:print("Central task running...")await asyncio.sleep(1)async def main():"""主函数,负责协调所有异步任务"""try:# 创建任务peripheral = asyncio.create_task(peripheral_task())central = asyncio.create_task(central_task())acc_task = asyncio.create_task(mpu6050.acc_read())# 等待任务完成(这里不会完成,因为是无限循环)await asyncio.gather(peripheral, central, acc_task)except KeyboardInterrupt:print("Interrupted by user")finally:# 清理工作print("Cleaning up...")# 注册回调函数到matrixScan模块
mpu6050.register_speed_ble_callback(send_acceleration_data_via_ble)
if __name__ == "__main__":print('main')# 启动主事件循环asyncio.run(main())