当前位置: 首页 > news >正文

【物联网】BLE 系统架构全景图

BLE 开发完整指南:从Arduino到树莓派的完整解决方案

一、BLE 系统架构全景图

系统组成

Arduino Nano 33 IoT (Peripheral) ←BLE→ Raspberry Pi (Central) ←MQTT→ AWS Cloud↑                              ↑传感器数据采集                     数据聚合与转发执行器控制                        协议转换

二、Arduino BLE 代码详解

1. BLE_periodic_update.ino - 周期性数据更新

// 作用:定期更新特征值并通过通知推送
// 应用:实时传感器数据推送#include <ArduinoBLE.h>// 自定义UUID(重要:使用唯一UUID)
#define SERVICE_UUID        "19B10000-E8F2-537E-4F6C-D104768A1214"
#define CHARACTERISTIC_UUID "19B10001-E8F2-537E-4F6C-D104768A1214"BLEService customService(SERVICE_UUID);                
BLECharacteristic readOnlyCharacteristic(CHARACTERISTIC_UUID,BLERead | BLENotify, 20  // 可读+通知权限,最大20字节
);unsigned long lastUpdate = 0;
const unsigned long updateInterval = 1000; // 1秒更新间隔
int counter = 0;void setup() {Serial.begin(9600);// BLE初始化if (!BLE.begin()) {Serial.println("Failed to initialize BLE!");while (true);}// 设置设备标识(必须唯一)BLE.setDeviceName("YourSensorNode");BLE.setLocalName("YourSensorNode");// 配置服务与特征BLE.setAdvertisedService(customService);customService.addCharacteristic(readOnlyCharacteristic);BLE.addService(customService);// 设置初始值readOnlyCharacteristic.writeValue("Init");// 开始广播BLE.advertise();Serial.println("BLE advertising with periodic updates...");
}void loop() {BLE.poll();  // 必须调用,处理BLE事件// 定期更新数据if (millis() - lastUpdate > updateInterval) {lastUpdate = millis();counter++;// 生成新数据(中替换为传感器数据)String newValue = "Update #" + String(counter);readOnlyCharacteristic.writeValue(newValue.c_str());Serial.print("Updated: ");Serial.println(newValue);}
}

2. 改造应用

// 替换数据生成部分为传感器读取
void updateSensorData() {float temperature = dht.readTemperature();float humidity = dht.readHumidity();// JSON格式数据String sensorData = "{\"temp\":" + String(temperature) + ",\"humidity\":" + String(humidity) + "}";readOnlyCharacteristic.writeValue(sensorData.c_str());Serial.println("Sensor data updated: " + sensorData);
}

三、树莓派 Python BLE 客户端详解

1. bleak_scan.py - BLE设备扫描

# 作用:扫描周围BLE设备
# 应用:发现Arduino设备import asyncio
from bleak import BleakScannerasync def scan_ble_devices():print("Scanning for BLE devices...")devices = await BleakScanner.discover(timeout=5.0)if not devices:print("No BLE devices found.")else:print("Found devices:")for d in devices:print(f"{d.address} - {d.name}")# 运行扫描
asyncio.run(scan_ble_devices())

2. bleak_name_conn.py - 按名称连接

# 作用:通过设备名称连接并读取特征
# 应用:连接特定Arduino设备import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称片段
TARGET_NAME_FRAGMENT = "ArduinoSensor"  # 改为你的设备名# 特征UUID(必须与Arduino一致)
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"async def scan_and_connect():print("Scanning for BLE devices...")devices = await BleakScanner.discover(timeout=5.0)# 查找目标设备target_device = Nonefor d in devices:if d.name and TARGET_NAME_FRAGMENT in d.name:target_device = dbreakif not target_device:print(f"No device found with name containing '{TARGET_NAME_FRAGMENT}'.")returnprint(f"Found target: {target_device.name} ({target_device.address})")# 连接并读取数据async with BleakClient(target_device.address) as client:if client.is_connected:print("Connected successfully.")try:# 读取特征值value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"Raw value: {value}")# 尝试解码为字符串try:decoded_value = value.decode('utf-8')print("Decoded value:", decoded_value)except:print("Value is not UTF-8 string")except Exception as e:print("Failed to read characteristic:", e)else:print("Failed to connect.")asyncio.run(scan_and_connect())

3. bleak_rec_notify.py - 接收通知

# 作用:订阅特征通知并实时接收数据
# 应用:实时接收传感器数据import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称片段
TARGET_NAME_FRAGMENT = "ArduinoSensor"# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# 通知处理函数
def handle_notification(sender, data):print(f"[Notification] From {sender}: {data}")try:# 解码JSON数据(需要)decoded_data = data.decode('utf-8')print("Decoded:", decoded_data)# 在这里添加MQTT发布逻辑# mqtt_client.publish("sensors/dht", decoded_data)except Exception as e:print("Decoding error:", e)async def scan_connect_and_subscribe():print("Scanning for devices...")devices = await BleakScanner.discover(timeout=5.0)# 查找设备target_device = Nonefor d in devices:if d.name and TARGET_NAME_FRAGMENT in d.name:target_device = dbreakif not target_device:print(f"No device found with name containing '{TARGET_NAME_FRAGMENT}'.")returnprint(f"Found device: {target_device.name} ({target_device.address})")# 连接并订阅通知async with BleakClient(target_device.address) as client:if client.is_connected:print("Connected successfully.")try:# 启动通知订阅await client.start_notify(CHARACTERISTIC_UUID, handle_notification)print("Subscribed to notifications. Press Ctrl+C to stop.")# 保持连接,持续接收数据while True:await asyncio.sleep(1)except Exception as e:print("Failed to subscribe:", e)else:print("Failed to connect.")try:asyncio.run(scan_connect_and_subscribe())
except KeyboardInterrupt:print("Program stopped by user.")

4. bleak_name_write.py - 写入特征值

# 作用:向特征写入数据
# 应用:发送控制命令到Arduinoimport asyncio
from bleak import BleakClient# 设备地址(通过扫描获取)
DEVICE_ADDRESS = "ABC6F8AB-50BE-AEA6-DD14-83B9340D3C96"# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"async def control_device():async with BleakClient(DEVICE_ADDRESS) as client:print(f"Connected: {client.is_connected}")try:# 读取当前值value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"Current value: {value.decode('utf-8')}")# 写入控制命令command = "LED_ON"  # 改为你的命令await client.write_gatt_char(CHARACTERISTIC_UUID, command.encode('utf-8'))print(f"Sent command: {command}")# 验证写入结果new_value = await client.read_gatt_char(CHARACTERISTIC_UUID)print(f"New value: {new_value.decode('utf-8')}")except Exception as e:print(f"Error: {e}")asyncio.run(control_device())

5. bleak_multiconn_notify.py - 多设备连接

# 作用:同时连接多个设备并接收通知
# 应用:聚合多个传感器数据import asyncio
from bleak import BleakScanner, BleakClient# 目标设备名称关键词
TARGET_NAME_KEYWORDS = ["SensorNode1", "SensorNode2"]# 特征UUID
CHARACTERISTIC_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# 设备客户端字典
clients = {}# 创建通知处理器
def make_notification_handler(name):def handler(sender, data):try:decoded_data = data.decode('utf-8')print(f"[{name}] {decoded_data}")# 在这里添加数据聚合逻辑except:print(f"[{name}] {data}")return handler# 连接并订阅单个设备
async def connect_and_subscribe(device, name):client = BleakClient(device.address)await client.connect()print(f"Connected to {name}")# 订阅通知await client.start_notify(CHARACTERISTIC_UUID, make_notification_handler(name))clients[name] = client# 扫描并连接所有匹配设备
async def scan_and_connect_all():print("Scanning for devices...")devices = await BleakScanner.discover(timeout=5.0)tasks = []for d in devices:if d.name:for keyword in TARGET_NAME_KEYWORDS:if keyword in d.name:tasks.append(connect_and_subscribe(d, d.name))breakif not tasks:print("No matching devices found.")returnprint(f"Connecting to {len(tasks)} devices...")await asyncio.gather(*tasks)print("All devices connected. Listening for notifications...")try:# 保持运行,持续接收数据while True:await asyncio.sleep(1)except KeyboardInterrupt:print("Disconnecting...")for client in clients.values():await client.disconnect()asyncio.run(scan_and_connect_all())

四、在完整应用方案

1. Arduino端改造(传感器数据+命令控制)

// 在BLE_periodic_update基础上改造
#define DHT_CHAR_UUID    "19B10001-E8F2-537E-4F6C-D104768A1214"  // 传感器数据
#define CMD_CHAR_UUID    "19B10002-E8F2-537E-4F6C-D104768A1214"  // 命令控制// 添加命令特征
BLEStringCharacteristic cmdCharacteristic(CMD_CHAR_UUID,BLEWrite,  // 只写权限20
);void setup() {// ...其他初始化代码// 添加命令特征customService.addCharacteristic(cmdCharacteristic);// 设置初始命令值cmdCharacteristic.writeValue("READY");
}void loop() {BLE.poll();// 处理命令if (cmdCharacteristic.written()) {String command = cmdCharacteristic.value();processCommand(command);}// 定期发送传感器数据if (millis() - lastUpdate > updateInterval) {updateSensorData();lastUpdate = millis();}
}void processCommand(String command) {if (command == "LED_ON") {digitalWrite(LED_PIN, HIGH);Serial.println("LED turned ON");} else if (command == "LED_OFF") {digitalWrite(LED_PIN, LOW);Serial.println("LED turned OFF");}
}

2. 树莓派端完整方案(BLE + MQTT)

# bleread_mqttpub.py - 整合BLE读取和MQTT发布
import asyncio
from bleak import BleakScanner, BleakClient
import paho.mqtt.client as mqtt
import json# BLE配置
TARGET_NAME = "ArduinoSensor"
SENSOR_CHAR_UUID = "19B10001-E8F2-537E-4F6C-D104768A1214"# MQTT配置
MQTT_HOST = "your-ec2-ip"
MQTT_TOPIC = "ifn649/sensors/dht"# MQTT客户端初始化
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, 1883)def on_notification(sender, data):try:# 解析传感器数据sensor_data = data.decode('utf-8')print(f"Received: {sensor_data}")# 发布到MQTTmqtt_client.publish(MQTT_TOPIC, sensor_data)print(f"Published to MQTT: {sensor_data}")except Exception as e:print(f"Error processing data: {e}")async def main():# 扫描并连接设备devices = await BleakScanner.discover()target_device = Nonefor d in devices:if d.name and TARGET_NAME in d.name:target_device = dbreakif not target_device:print("Target device not found")return# 连接并订阅通知async with BleakClient(target_device.address) as client:await client.start_notify(SENSOR_CHAR_UUID, on_notification)print("Connected and subscribed. Press Ctrl+C to stop.")# 保持运行while True:await asyncio.sleep(1)# 运行主程序
try:asyncio.run(main())
except KeyboardInterrupt:print("Program stopped")mqtt_client.disconnect()

3. 命令控制端(MQTT订阅 + BLE写入)

# mqttsub_blewrite.py - MQTT订阅转BLE写入
import asyncio
from bleak import BleakScanner, BleakClient
import paho.mqtt.client as mqtt# BLE配置
TARGET_NAME = "ArduinoSensor"
CMD_CHAR_UUID = "19B10002-E8F2-537E-4F6C-D104768A1214"# MQTT配置
MQTT_HOST = "your-ec2-ip"
MQTT_TOPIC = "ifn649/commands/led"# BLE设备地址缓存
ble_device_address = Noneasync def ble_write_command(command):if not ble_device_address:print("BLE device not found")returntry:async with BleakClient(ble_device_address) as client:await client.write_gatt_char(CMD_CHAR_UUID, command.encode('utf-8'))print(f"Command sent: {command}")except Exception as e:print(f"BLE write error: {e}")def on_mqtt_message(client, userdata, msg):command = msg.payload.decode('utf-8')print(f"MQTT command received: {command}")# 异步执行BLE写入asyncio.create_task(ble_write_command(command))async def discover_ble_device():global ble_device_addressdevices = await BleakScanner.discover()for d in devices:if d.name and TARGET_NAME in d.name:ble_device_address = d.addressprint(f"Found BLE device: {d.name} ({d.address})")return Trueprint("BLE device not found")return Falseasync def main():# 发现BLE设备if not await discover_ble_device():return# 设置MQTT客户端mqtt_client = mqtt.Client()mqtt_client.on_message = on_mqtt_messagemqtt_client.connect(MQTT_HOST, 1883)mqtt_client.subscribe(MQTT_TOPIC)print("MQTT subscriber started. Waiting for commands...")mqtt_client.loop_forever()# 启动程序
asyncio.run(main())

五、调试与最佳实践

1. 调试技巧

# 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)# 异常处理
try:await client.read_gatt_char(CHARACTERISTIC_UUID)
except Exception as e:print(f"Error: {e}")# 重连逻辑

2. 性能优化

# 使用连接池管理多个设备连接
# 设置合理的超时时间
# 使用异步编程避免阻塞

3. 错误恢复

# 自动重连机制
async def resilient_connect(address, max_retries=3):for attempt in range(max_retries):try:async with BleakClient(address) as client:return clientexcept Exception as e:print(f"Connection attempt {attempt+1} failed: {e}")await asyncio.sleep(2)return None
http://www.xdnf.cn/news/1398583.html

相关文章:

  • 关于PXIe工控机XH-PXIe7312双路25G网卡
  • Docker核心概念与镜像仓库操作指南
  • FlowUs AI-FlowUs息流推出的AI创作助手
  • uniapp监听物理返回按钮事件
  • Ansible主机模式与文件导入技巧
  • C++世界的大门——基础知识总结
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(六)
  • 工业产品营销:概念、原理、流程与实践指南
  • 【浅尝Java】运算符全介绍(含除法取模运算各情况分析、位运算与移位运算分析、逻辑与条件运算符)
  • Raycast 使用指南:解锁 macOS 生产力新高度
  • Kotlin Android 水印功能实现指南:使用 Watermark 库
  • Netty 心跳与链路保活机制详解:保证高并发环境下的稳定连接
  • 互联网大厂大模型应用开发岗位面试:技术点详解与业务场景演练
  • Spark mapGroups 函数详解与多种用法示例
  • Java面试-MyBatis篇
  • 执行一条Select语句流程
  • python pyqt5开发DoIP上位机【诊断回复的函数都是怎么调用的?】
  • Jedis、Lettuce、Redisson 技术选型对比
  • 【前端教程】HTML 基础界面开发
  • Dify工作流之合同信息提取
  • 【74LS112JK触发器三进制】2022-10-8
  • 常量指针与指针常量习题(一)
  • 每日算法题【二叉树】:二叉树的最大深度、翻转二叉树、平衡二叉树
  • GROMACS 安装:详细教程来袭
  • 上层协议依赖TCP
  • 【系列10】端侧AI:构建与部署高效的本地化AI模型 第9章:移动端部署实战 - iOS
  • pdf转ofd之移花接木
  • 面试 八股文 经典题目 - Mysql部分(一)
  • jsqlparser(六):TablesNamesFinder 深度解析与 SQL 格式化实现
  • Java中使用正则表达式的正确打开方式