使用Python做bootloader下载程序
该教程通过使用peak can和python 及相关的库实现bootloader下载功能,该工具非常适合研发阶段,可根据需要方便修改,避免频繁修改界面语言设计的上位机。量产阶段仍推荐界面的上位机。
一、Python库
1、python-can支持的CAN设备
推荐版本4.6以上,python>=3.8,本教程以peak can为例【需先安装相关驱动,并把对应的pcanbasic.dll版本(32bit/64bit)放在系统变量中】,其他设备需要修改:
can_channel,bustype 设置,详见:python-can
2、udsoncan
实现UDS的封装 [ISO 14229],分客户端和服务端,该教程使用客户端,ECU实现服务端。
3、can-isotp
实现CAN-TP层[ISO 15765]的操作,单帧,流控帧……
4、bincopy
实现Motorola S-Record, Intel HEX 和 binary 文件的解析
5、代码
import udsoncan # pip install udsoncan
from udsoncan.client import Client
from udsoncan.connections import PythonIsoTpConnectionimport can # pip install python-can==4.6
import isotp # pip install can-isotp
import bincopy # pip install bincopy, for reading S19 files
import tqdm # pip install tqdm, for progress bar, py3.8 pip install tqdm -i https://mirrors.aliyun.com/pypi/simple/
from can.bit_timing import BitTimingFd
二、自定义函数
1、安全算法示例 - CRC16
可根据实际需要修改该安全算法【如借用AI编程转化C代码为python代码】
def CRC16(level, seed, params):crc = params['initial_value'] polynomial = params['polynomial'] for byte in seed:crc ^= (byte << 8)for _ in range(8):if crc & 0x8000:crc = (crc << 1) ^ polynomialelse:crc <<= 1crc &= 0xFFFF # Ensure CRC remains 16-bitkey = crc.to_bytes(2, byteorder=BYTE_ORDER) # Convert to 2-byte representationprint(f" - CRC16 used <Seed>: {seed.hex()}" + f" with <Key>: {key.hex()}") return key
三、配置数据
# configure data
ROUTINE_ID_ERASE_FLASH = 0xFF22 # Erase Flash Routine ID, [Update for project]
SUB_FUNCTION_ERASE = 0x01 # Subfunction for Erase Flash, [Update for project]
ROUTINE_ID_CHECK_CONSISTENCY = 0xFF33 # Check Consistency Routine ID, [Update for project]
SUB_FUNCTION_CHECK = 0x01 # Subfunction for Check Consistency, [Update for project]BYTE_ORDER = 'big' # Byte order for multi-byte values
CHUNK_SIZE = 1024 # Size of data chunks for transfer
SECURITY_LEVEL = 0x01 # Security level 0x01 = Security Access Level 1
ECU_ADDRESS_SIZE = 32 # Address size of ECU, 32 bits MCU
ECU_ADDRESS_TYPE = 4 # Address type of ECU, 4 bytes (32 bits) MCU
CAN_FD_ENABLE = True
CAN_FRMAE_SIZE = 64 # CAN frame size, 8 bytes for classic CAN, 64 bytes for CAN FD=64
TIMING_FD = BitTimingFd(f_clock=40000000,nom_brp=4, nom_tseg1=15, nom_tseg2=4, nom_sjw=3, # 500KHzdata_brp=1, data_tseg1=15, data_tseg2=4, data_sjw=3) # 2MHzCRC16_DICT = dict(initial_value = 0xFFFF, polynomial = 0x1021) # CRC-16-CCITT polynomial# configure ISO-TP
ISO_PARAMETERS = {'stmin': 0, # Minimum time between frames in milliseconds'can_fd': CAN_FD_ENABLE, # Enable CAN FD support'blocksize': CAN_FRMAE_SIZE, # Request the sender to send 8 consecutives frames before sending a new flow control message'wftmax': 0,'tx_data_length': CAN_FRMAE_SIZE, # Maximum length of data in a single CAN frame'tx_data_min_length': 8, # Minimum length of data in a single CAN frame,inhibit the dlc less than 8 bytes[ECU boot issue]'tx_padding': 0xFF, # Padding byte for data frames'rx_flowcontrol_timeout': 2000,'rx_consecutive_frame_timeout': 2000,'max_frame_size': CHUNK_SIZE, # Maximum size of a single frame'bitrate_switch': True, # Enable bitrate switching for CAN FD}# configure UDS
UDS_PARAMETERS = {'security_algo': CRC16, # Security algorithm'security_algo_params': CRC16_DICT, # Parameters for the security algorithm'use_server_timing': False, # Use server timing for response timeouts'p2_timeout': 0.5, # Timeout for the sender to wait for response message'p2_star_timeout': 6.0, 'request_timeout' : 10,
}
四、CANBootloader 类
1、__init__
def __init__(self, can_channel = 'PCAN_USBBUS1', bitrate = 500000, func_tx_id = 0x701, phy_tx_id = 0x702, rx_id = 0x703):self.can_channel = can_channel # default is 'PCAN_USBBUS1' for PCAN-USBself.bitrate = bitrateself.func_tx_id = func_tx_idself.phy_tx_id = phy_tx_idself.rx_id = rx_idself.bus = Noneself.func_client = Noneself.phy_client = Noneself.phy_stack = Noneself.func_stack = None
2、setup_can
def setup_can(self, notifier_enable = True): self.bus = can.interface.Bus(channel = self.can_channel, bustype ='pcan', bitrate = self.bitrate, fd = CAN_FD_ENABLE,timing = TIMING_FD, data_bitrate=500000,receive_own_messages=True)current_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())log_file = f'.\\LogFile\\Boot_{current_time}.csv' # Log file path format in [.asc/.blf/.csv/.db/.mf4 (need asammdf)/.trc/.txt]if notifier_enable:notifier = can.Notifier(self.bus, [can.Printer(), can.Logger(log_file)], timeout=0.5) # Create a notifier with filterselse:notifier = can.Notifier(self.bus, [can.Logger(log_file)], timeout=0.5) # Create a notifier with no filters# Configure ISOTP stack for functional and physical channelsfunc_address = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=self.rx_id, txid=self.func_tx_id)phy_address = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=self.rx_id, txid=self.phy_tx_id)self.func_stack = isotp.NotifierBasedCanStack(bus=self.bus, params = ISO_PARAMETERS, notifier = notifier, address = func_address) self.func_stack.set_sleep_timing(0, 0) self.func_client = Client(PythonIsoTpConnection(self.func_stack), request_timeout = UDS_PARAMETERS['request_timeout']) # Set request timeout to n secondself.func_client.config['p2_timeout'] = UDS_PARAMETERS['p2_timeout'] # udsontp configself.func_client.open()# pyhysical channelself.phy_stack = isotp.NotifierBasedCanStack(bus=self.bus, params = ISO_PARAMETERS, notifier = notifier, address = phy_address)self.phy_client = Client(PythonIsoTpConnection(self.phy_stack), request_timeout = UDS_PARAMETERS['request_timeout']) # Set request timeout to n secondself.phy_client.config['p2_timeout'] = UDS_PARAMETERS['p2_timeout'] self.phy_client.config['p2_star_timeout'] = UDS_PARAMETERS['p2_star_timeout']self.phy_client.config['use_server_timing'] = UDS_PARAMETERS['use_server_timing']self.phy_client.config['security_algo'] = UDS_PARAMETERS['security_algo'] # udsontp configself.phy_client.config['security_algo_params'] = UDS_PARAMETERS['security_algo_params'] self.phy_client.open()
3、close_can
def close_can(self):self.func_client.close()self.phy_client.close()self.func_stack.notifier.stop() # Stop the notifierself.phy_stack.notifier.stop() # Stop the notifierself.bus.shutdown()
4、flash_ready
def flash_ready(self):ready_ok = Falsetry: self.func_client.change_session(0x03) self.func_client.communication_control(0x03, 0x03) self.phy_client.change_session(0x02) time.sleep(0.5) resp = self.phy_client.unlock_security_access(SECURITY_LEVEL) if resp != None and resp.positive: ready_ok = Trueelse:print("[BT log]: No Seed Received")except udsoncan.exceptions.TimeoutException as e:passexcept udsoncan.exceptions.NegativeResponseException as e:print(e.response) return ready_ok
5、flash_post
def flash_post(self):try: print("[BT log] : ECU rest...")self.phy_client.ecu_reset(0x01) time.sleep(1.0) except udsoncan.exceptions.NegativeResponseException as e:print(e.response) except udsoncan.exceptions.TimeoutException as e:pass
6、flash_driver
def flash_driver(self, drv_data):flash_success = Falsetry: drv_data_size = drv_data.maximum_address - drv_data.minimum_address drv_mem = udsoncan.MemoryLocation(drv_data.minimum_address, drv_data_size, address_format = ECU_ADDRESS_SIZE, memorysize_format = ECU_ADDRESS_SIZE) self.phy_client.request_download(drv_mem, dfi=None) # Request Downloadcnt = 1 # Block Counter from 1 startchunks = tqdm.tqdm(self.split_into_chunks(drv_data.as_binary(), CHUNK_SIZE), desc=' - Transfer Data to RAM...',) for chunk in chunks:data = bytes(chunk)self.phy_client.transfer_data((cnt & 0xFF), data) # Transfer Data cnt += 1self.phy_client.request_transfer_exit() # Request Transfer Exitflash_success = Trueexcept udsoncan.exceptions.TimeoutException as e:passexcept udsoncan.exceptions.NegativeResponseException as e:print(e.response) return flash_success
7、flash_firmware
def flash_firmware(self, firmware_data, verify_of_data, verify_of_data_size):flash_success = Falsetry: firmware_first_address = firmware_data.minimum_addressfirmware_data_size = firmware_data.maximum_address - firmware_data.minimum_address # maximum_address has plus 1 bytefirmware_first_address_bytes = firmware_first_address.to_bytes(ECU_ADDRESS_TYPE, byteorder = BYTE_ORDER)firmware_data_size_bytes = firmware_data_size.to_bytes(ECU_ADDRESS_TYPE, byteorder = BYTE_ORDER)verify_of_data_bytes = verify_of_data.to_bytes(verify_of_data_size, byteorder = BYTE_ORDER)data = firmware_first_address_bytes + firmware_data_size_bytesself.phy_client.routine_control(ROUTINE_ID_ERASE_FLASH, SUB_FUNCTION_ERASE, data) print("[BT log] : Starting firmware flashing...")firmware_mem = udsoncan.MemoryLocation(firmware_first_address, firmware_data_size, address_format = ECU_ADDRESS_SIZE, memorysize_format = ECU_ADDRESS_SIZE)self.phy_client.request_download(firmware_mem, dfi = None) chunks = self.split_into_chunks(firmware_data.as_binary(), CHUNK_SIZE) chunk_index = 0for chunk in tqdm.tqdm(chunks, desc=' - Transfer Data to write flash...'): block_cnt = (chunk_index + 1) % 256self.phy_client.transfer_data(block_cnt, bytes(chunk)) # Transfer Datachunk_index += 1print("[BT log]: driver flashing completed successfully.")self.phy_client.request_transfer_exit() print("[BT log]: Check Consistency of firmware in ECU.")data = firmware_first_address_bytes + firmware_data_size_bytes + verify_of_data_bytesself.phy_client.routine_control(ROUTINE_ID_CHECK_CONSISTENCY, SUB_FUNCTION_CHECK, data) flash_success = Trueexcept udsoncan.exceptions.TimeoutException as e:passexcept udsoncan.exceptions.NegativeResponseException as e:print(e.response) return flash_success
8、flash_run
def flash_run(self, flash_drv_data, firmware_data, verify_of_data):start_time = time.time()if self.flash_ready() :flash_driver_success = self.flash_driver(flash_drv_data)if flash_driver_success:flash_firmware_success = self.flash_firmware(firmware_data, verify_of_data, verify_of_data_size)if flash_firmware_success:self.flash_post() time_taken = (time.time() - start_time)print("[BT log] <Info>: Firmware flashing completed successfully.")print(" Time Taken :%10.3f Seconds" % time_taken)else:print("[BT log] : Flash Failed")else:print("[BT log] : Flash Driver Failed")else:print("[BT log] : Flash Ready Failed")
9、split_into_chunks
def split_into_chunks(data, chunk_size):chunk_list = []for i in range(0, len(data), chunk_size): chunk_list.append(data[i:i + chunk_size]) return chunk_list
五、使用CANBootloader
1、main
if __name__ == "__main__":logging.basicConfig(level=logging.ERROR) ## Change the paths to your firmware and driver filesflash_drv_path ='./Binary/flashdrv.s19' firmware_path = './Binary/Application.s19'flash_drv_data = bincopy.BinFile(flash_drv_path)firmware_data = bincopy.BinFile(firmware_path)# Verify firmware data setting [change for project][verify_of_data, verify_of_data_size] = CRC32(firmware_data.as_binary()) bootloader = CANBootloader(bitrate = 500000, func_tx_id = 0x701, phy_tx_id= 0x702, rx_id = 0x703) bootloader.setup_can(notifier_enable = False) bootloader.flash_run(flash_drv_data, firmware_data, verify_of_data) bootloader.close_can()