当前位置: 首页 > ai >正文

使用Python实现DLT645-2007智能电表协议

文章目录

      • 🌴通讯支持
      • 🌴 功能完成情况
      • 服务端架构设计
        • 一、核心模块划分
        • 二、数据层定义
        • 三、协议解析层
        • 四、通信业务层(以DLT645服务端为例)
        • 五、通信层(以TCP为例)
        • 使用例子

🌴通讯支持

功能状态
TCP客户端(方便通讯测试) 🐾
TCP服务端(方便通讯测试) 🐾
RTU主站 🐾
RTU从站 🐾

🌴 功能完成情况

功能状态
读、写通讯地址 🐾
广播校时 🐾
电能量 🐾
最大需量及发生时间 🐾
变量 🐾
事件记录 🐾
参变量 🐾
冻结量 🐾
负荷纪录 🐾

项目地址:https://gitee.com/chen-dongyu123/dlt645

服务端架构设计

一、核心模块划分
  • 数据层:模拟电表数据,存储数据标识与值的映射关系。
  • 协议解析层:负责帧的组装、拆卸、校验和转义。
  • 业务逻辑层:根据解析出的控制码和DI,执行相应操作并组织回复数据。
  • 通信层:负责底层的字节流收发(串口/TCP)。

代码结构

├── config      		# 测点json,用于初始化导入
├── src        			# 源文件夹
│   ├── common			# 存放通用函数
│   ├── model			# 数据模型
│   │   ├── data
│   │   │   └── define
│   │   └── types		# dlt645数据类型
│   ├── protocol		# 协议解析层
│   ├── service
│   │   ├── clientsvc	# dlt645客户端api及实现
│   │   └── serversvc	# dlt645服务端api及实现
│   └── transport
│       ├── client		# 客户端通讯接口,支持TCP客户端、RTU主站
│       └── server		# 服务端通讯接口,支持TCP服务端、RTU从站
└── test				# 测试文件

流程图:启动 -> 监听连接 -> 接收数据 -> 解析帧 -> 处理请求 -> 组织回复帧 -> 发送数据

二、数据层定义
  1. 通过读取config文件夹里的json导入测点,json示例如下

    [{"Di": "02010100","Name": "A相电压","Unit": "V","DataFormat": "XXX.X"},{"Di": "02010200","Name": "B相电压","Unit": "V","DataFormat": "XXX.X"},...
    ]
    
  2. 定义数据类型和映射map

    class DataItem:def __init__(self, di: int, name: str, data_format: str, value: float = 0, unit: str = '', timestamp: int = 0):self.di = diself.name = nameself.data_format = data_formatself.value = valueself.unit = unitself.timestamp = timestampdef __repr__(self):return (f"DataItem(name={self.name}, di={format(self.di, '#x')}, value={self.value}, "f"unit={self.unit},data_format={self.data_format}, timestamp={self.timestamp})")DIMap: Dict[int, DataItem] = {}
    
  3. 定义设置DataItem值接口和获取DataItem接口

    def get_data_item(di: int) -> Optional[DataItem]:"""根据 di 获取数据项"""item = DIMap.get(di)if item is None:log.info(f"未通过di {hex(di)} 找到映射")return Nonereturn itemdef set_data_item(di: int, data: Any) -> bool:"""设置指定 di 的数据项"""if data is None:log.info("data is nil")return Falseif di in DIMap:DIMap[di].value = datalog.info(f"设置数据项 {hex(di)} 成功, 值 {DIMap[di]}")return Truereturn False
    
  4. 初始化数据标识map

    def init_variable_def(VariableTypes: List[DataItem]):for date_type in VariableTypes:DIMap[date_type.di] = DataItem(di=date_type.di,name=date_type.name,data_format=date_type.data_format,unit=date_type.unit)
    
  5. 初始化DLT645相关测点数据

    EnergyTypes = []
    DemandTypes = []
    VariableTypes = []def init():global EnergyTypesEnergyTypes = initDataTypeFromJson(os.path.join(conf_path, 'energy_types.json'))DemandTypes = initDataTypeFromJson(os.path.join(conf_path, 'demand_types.json'))VariableTypes = initDataTypeFromJson(os.path.join(conf_path, 'variable_types.json'))init_energy_def(EnergyTypes)init_demand_def(DemandTypes)init_variable_def(VariableTypes)# 执行初始化
    init()
    
三、协议解析层
  1. 帧类型

    # 常量定义
    FRAME_START_BYTE = 0x68
    FRAME_END_BYTE = 0x16
    BROADCAST_ADDR = 0xAAclass Frame:def __init__(self, preamble: List[int] = None, start_flag: int = 0, addr: List[int] = None,ctrl_code: int = 0, data_len: int = 0, data: List[int] = None,check_sum: int = 0, end_flag: int = 0):self.preamble = preamble if preamble is not None else []  # 前导字节self.start_flag = start_flag  # 起始字节self.addr = addr if addr is not None else [0] * 6  # 地址字节self.ctrl_code = ctrl_code  # 控制字节self.data_len = data_len  # 数据长度字节self.data = data if data is not None else []  # 数据字节self.check_sum = check_sum  # 校验字节self.end_flag = end_flag  # 结束字节
    
  2. 协议解析

    • 数据域解码

          @classmethoddef decode_data(cls, data: bytes) -> bytes:"""数据域解码(±33H转换)"""return bytes([b - 0x33 for b in data])
      
    • 数据域编码

          @classmethoddef encode_data(cls, data: bytes) -> bytes:"""数据域编码"""return bytes([b + 0x33 for b in data])
      
    • 计算校验和

          @classmethoddef calculate_checksum(cls, data: bytes) -> int:"""校验和计算(模256求和)"""return sum(data) % 256
      
    • 构建帧

          @classmethoddef build_frame(cls, addr: bytes, ctrl_code: int, data: bytes) -> bytearray:"""帧构建(支持广播和单播)"""if len(addr) != 6:raise ValueError("地址长度必须为6字节")buf = []buf.append(FRAME_START_BYTE)buf.extend(addr)buf.append(FRAME_START_BYTE)buf.append(ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(data)buf.append(len(encoded_data))buf.extend(encoded_data)# 计算校验和check_sum = DLT645Protocol.calculate_checksum(bytes(buf))buf.append(check_sum)buf.append(FRAME_END_BYTE)return bytearray(buf)
      
    • 字节数组反序列化Frame结构体

          @classmethoddef deserialize(cls, raw: bytes) -> Optional[Frame]:"""将字节切片反序列化为 Frame 结构体"""# 基础校验if len(raw) < 12:raise Exception(f"frame too short: {raw}")# 帧边界检查(需考虑前导FE)try:start_idx = raw.index(FRAME_START_BYTE)except ValueError:log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx == -1 or start_idx + 10 >= len(raw):log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx + 7 >= len(raw) or raw[start_idx + 7] != FRAME_START_BYTE:log.error(f"missing second start flag: {raw}")raise Exception("missing second start flag")# 构建帧结构frame = Frame()frame.start_flag = raw[start_idx]frame.addr = raw[start_idx + 1:start_idx + 7]frame.ctrl_code = raw[start_idx + 8]frame.data_len = raw[start_idx + 9]# 数据域提取(严格按协议1.2.5节处理)data_start = start_idx + 10data_end = data_start + frame.data_lenif data_end > len(raw) - 2:log.error(f"invalid data length {frame.data_len}")raise Exception(f"invalid data length {frame.data_len}")# 数据域解码(需处理加33H/减33H)frame.data = DLT645Protocol.decode_data(raw[data_start:data_end])# 校验和验证(从第一个68H到校验码前)checksum_start = start_idxchecksum_end = data_endif checksum_end >= len(raw):log.error(f"frame truncated: {raw}")raise Exception(f"frame truncated: {raw}")calculated_sum = DLT645Protocol.calculate_checksum(raw[checksum_start:checksum_end])if calculated_sum != raw[checksum_end]:log.error(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")raise Exception(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")# 结束符验证if checksum_end + 1 >= len(raw) or raw[checksum_end + 1] != FRAME_END_BYTE:log.error(f"invalid end flag: {raw[checksum_end + 1]}")raise Exception(f"invalid end flag: {raw[checksum_end + 1]}")# 转换为带缩进的JSONlog.info(f"frame: {frame}")return frame
      
    • Frame 结构体序列化为字节数组

          @classmethoddef serialize(cls, frame: Frame) -> Optional[bytes]:"""将 Frame 结构体序列化为字节切片"""if frame.start_flag != FRAME_START_BYTE or frame.end_flag != FRAME_END_BYTE:log.error(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")raise Exception(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")buf = []# 写入前导字节buf.extend(frame.preamble)# 写入起始符buf.append(frame.start_flag)# 写入地址buf.extend(frame.addr)# 写入第二个起始符buf.append(frame.start_flag)# 写入控制码buf.append(frame.ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(frame.data)# 写入数据长度buf.append(len(encoded_data))# 写入编码后的数据buf.extend(encoded_data)# 计算并写入校验和check_sum = DLT645Protocol.calculate_checksum(bytearray(buf))buf.append(check_sum)# 写入结束符buf.append(frame.end_flag)return bytearray(buf)
      
    四、通信业务层(以DLT645服务端为例)
    1. 定义电表Service

      class MeterServerService:def __init__(self,server: Union[TcpServer, RtuServer],address: bytearray = bytearray([0x00] * 6),password: bytearray = bytearray([0x00] * 4)):self.server = server	self.address = addressself.password = password
      
    2. 设置值接口,以电能为例

          # 写电能量def set_00(self, di: int, value: float) -> bool:ok = set_data_item(di, value)if not ok:log.error(f"写电能量失败")return ok
      
    3. 处理数据函数

          # 处理读数据请求(协议与业务分离)def handle_request(self, frame):# 1. 验证设备if not self.validate_device(frame.addr):log.info(f"验证设备地址: {bytes_to_spaced_hex(frame.addr)} 失败")raise Exception("unauthorized device")# 2. 根据控制码判断请求类型if frame.ctrl_code == CtrlCode.BroadcastTimeSync:  # 广播校时log.info(f"广播校时: {frame.Data.hex(' ')}")self.set_time(frame.Data)return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, frame.data)elif frame.ctrl_code == CtrlCode.CtrlReadData:# 解析数据标识di = frame.datadi3 = di[3]if di3 == 0x00:  # 读取电能# 构建响应帧res_data = bytearray(8)# 解析数据标识为 32 位无符号整数data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4]  # 仅复制前 4 字节数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x01:  # 读取最大需量及发生时间res_data = bytearray(12)data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4]  # 返回数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:7] = bcd_value[:3]# 需量发生时间res_data[7:12] = time_to_bcd(time.time())log.info(f"读取最大需量及发生时间: {res_data}")return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x02:  # 读变量data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")# 变量数据长度data_len = 4data_len += (len(data_item.data_format) - 1) // 2  # (数据格式长度 - 1 位小数点)/2# 构建响应帧res_data = bytearray(data_len)res_data[:4] = frame.data[:4]  # 仅复制前 4 字节value = data_item.value# 转换为 BCD 码(小端序)bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:data_len] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))else:log.info(f"unknown: {hex(di3)}")return Exception("unknown di3")elif frame.ctrl_code == CtrlCode.ReadAddress:# 构建响应帧res_data = self.address[:6]return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)elif frame.ctrl_code == CtrlCode.WriteAddress:res_data = b''  # 写通讯地址不需要返回数据# 解析数据addr = frame.data[:6]self.set_address(addr)  # 设置通讯地址return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)else:log.info(f"unknown control code: {hex(frame.ctrl_code)}")raise Exception("unknown control code")
      
    4. 注入通讯协议到Service

      def new_tcp_server(ip: str, port: int, timeout: int) -> MeterServerService:# 1. 先创建 TcpServer(不依赖 Service)tcp_server = TcpServer(ip, port, timeout, None)# 2. 创建 MeterServerService,注入 TcpServer(作为 Server 接口)meter_service = MeterServerService(tcp_server)# 3. 将 MeterServerService 注入回 TcpServertcp_server.service = meter_servicereturn meter_servicedef new_rtu_server(port: str, dataBits: int, stopBits: int, baudRate: int, parity: str,timeout: float) -> MeterServerService:rtu_server = RtuServer(port, dataBits, stopBits, baudRate, parity, timeout)# 2. 创建 MeterServerService,注入 RtuServer(作为 Server 接口)meter_service = MeterServerService(rtu_server)# 3. 将 MeterServerService 注入回 RtuServerrtu_server.service = meter_servicereturn meter_service
      
    五、通信层(以TCP为例)
    1. 定义TCP服务端

      class TcpServer:def __init__(self, ip: str, port: int, timeout: float, service):self.ip = ipself.port = portself.timeout = timeoutself.ln = Noneself.service = service	# Dlt645业务
      
    2. 处理数据

          def handle_connection(self, conn):try:while True:try:# 接收数据buf = conn.recv(256)if not buf:breaklog.info(f"Received data: {bytes_to_spaced_hex(buf)}")# 协议解析try:frame = DLT645Protocol.deserialize(buf)except Exception as e:log.error(f"Error parsing frame: {e}")continue# 业务处理try:resp = self.service.handle_request(frame)except Exception as e:log.error(f"Error handling request: {e}")continue# 响应if resp:try:conn.sendall(resp)log.info(f"Sent response: {bytes_to_spaced_hex(resp)}")except Exception as e:log.error(f"Error writing response: {e}")except socket.timeout:breakexcept Exception as e:log.error(f"Error handling connection: {e}")finally:try:conn.close()except Exception as e:log.error(f"Error closing connection: {e}")
      
    3. 启动服务端

          def start(self):try:# 创建 TCP 套接字self.ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置地址可重用self.ln.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定地址和端口self.ln.bind((self.ip, self.port))# 开始监听self.ln.listen(5)log.info(f"TCP server started on port {self.port}")while True:try:# 接受连接conn, addr = self.ln.accept()log.info(f"Accepted connection from {addr}")# 设置超时时间conn.settimeout(self.timeout)# 启动新线程处理连接import threadingthreading.Thread(target=self.handle_connection, args=(conn,)).start()except socket.error as e:if isinstance(e, socket.timeout):continuelog.error(f"Failed to accept connection: {e}")if not e.errno == 10038:  # 非套接字关闭错误return eexcept Exception as e:log.error(f"Failed to start TCP server: {e}")return e
      
    4. 关闭服务端

          def stop(self):if self.ln:log.info("Shutting down TCP server...")try:self.ln.close()except Exception as e:log.error(f"Error closing server: {e}")return ereturn None
      
    使用例子
    1. 启动服务端

      if __name__ == '__main__':server_svc = new_tcp_server("127.0.0.1", 8021, 3000)# server_svc = new_rtu_server("COM4", 8, 1, 9600, "N", 1000)server_svc.set_00(0x00000000, 100.0)server_svc.set_02(0x02010100, 86.0)server_svc.server.start()
      

      在这里插入图片描述

    2. 使用模拟DLT645客户端软件测试

      在这里插入图片描述

http://www.xdnf.cn/news/18559.html

相关文章:

  • 基于php的萌宠社区网站的设计与实现、基于php的宠物社区论坛的设计与实现
  • 【QT入门到晋级】进程间通信(IPC)-共享内存
  • 十六进制与内存地址,数值的差异为1,表示差1个字节,而不是数值差异2^8才表示差一个字节
  • 03-鸿蒙架构与编程模型
  • 《Linux 网络编程二:UDP 与 TCP 的差异、应用及问题应对》
  • Meta AI 剧变:汪滔挥刀重组,Llama 开源路线告急,超级智能梦碎还是重生?
  • 深度学习(深度神经网络)Pytorch框架
  • LoRA 微调
  • Trip Footprint旅行足迹App技术架构全解析
  • 迭代器模式与几个经典的C++实现
  • 机器学习案例——预测矿物类型(模型训练)
  • 【JVM内存结构系列】一、入门:先搞懂整体框架,再学细节——避免从一开始就混淆概念
  • Linux服务器利用Systemd配置定时任务
  • FLOPs、TFLOPs 与 TOPS:计算能力单位
  • 纠删码技术,更省钱的分布式系统的可靠性技术
  • JAVA核心基础篇-枚举
  • Claude Code 新手使用入门教程
  • 【Kubernetes知识点】资源配额与访问控制
  • Qt + windows+exe+msvc打包教程
  • AI热点周报(8.17~8.23):Pixel 10“AI周”、DeepSeek V3.1发布,英伟达再起波澜?
  • 【python】get_dummies()用法
  • AI大模型 限时找我领取
  • 心灵笔记:人生管理模型
  • 简单AI:搜狐公司旗下AI绘画产品
  • 均匀实心球内部引力与半径成正比的牛顿壳层定理证明
  • MATLAB实现CNN-LSTM-Attention 时序和空间特征结合-融合注意力机制混合神经网络模型的风速预测
  • c语言学习_数组使用_扫雷1
  • 1.十天通关常见算法100题(第一天)
  • 科研笔记:博士生手册
  • 【每天一个知识点】训推一体机