基于Locust实现MQTT协议服务的压测脚本

 一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

  1. 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
  3. 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

  • 从db中查询有效可用的所有测试车辆信息数据
  • 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
  • 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
  • 脚本统计连接数、请求数、响应时间等信息写到报表中
  • 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import sslfrom paho.mqtt import client as mqtt_client# 根据不同系统进行路径适配
if os.name == "nt":path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))sys.path.insert(0, path)from GB_test.utils.mysql_operating import DB
elif os.name == "posix":sys.path.append("/app/qa_test_app/")from GB_test.utils.mysql_operating import DBfrom locust import User, TaskSet, events, task, between, run_single_userBROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超时时间
TEST_TOPIC = "test_topic"TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测试数据,仅示意BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 业务需要转换成 byte 类型后再发送# 创建队列
client_queue = queue.Queue()# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:# 把可用的车辆信息存到队列中去client_queue.put(t)def fire_success(**kwargs):"""请求成功时调用"""events.request.fire(**kwargs)def calculate_resp_time(t1, t2):"""计算响应时间"""return int((t2 - t1) * 1000)class MQTTMessage:"""已发送的消息实体类"""def __init__(self, _type, qos, topic, payload, start_time, timeout):self.type = _type,self.qos = qos,self.topic = topicself.payload = payloadself.start_time = start_timeself.timeout = timeout# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = []  # 定义存放连接断开的记录的列表容器class PublishTask(TaskSet):@taskdef task_publish(self):self.client.loop_start()topic = TEST_TOPICpayload = BYTES_DATA# 记录发送的开始时间start_time = time.time()mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)published_mid = mqtt_msg_info.mid# 将发送成功的消息内容,放入client实例的 published_message 字段self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,0,topic,payload,start_time,PUBLISH_TIMEOUT)# 发送成功回调self.client.on_publish = self.on_publish# 断开连接回调self.client.on_disconnect = self.on_disconnect@staticmethoddef on_disconnect(client, userdata, rc):""" broker连接断开,放入列表容器"""disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]disconnect_record_list.append(disconnected_info)print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))@staticmethoddef on_publish(client, userdata, mid):if mid:# 记录消息发送成功的时间end_time = time.time()# 从已发送的消息容器中,取出消息message = client.published_message.pop(mid, None)# 计算开始发送到发送成功的耗时publish_resp_time = calculate_resp_time(message.start_time, end_time)fire_success(request_type="p_success",name="client_id: " + str(client._client_id),response_time=publish_resp_time,response_length=len(message.payload),exception=None,context=None)global total_published# 成功发送累加1total_published += 1class MQTTLocustUser(User):tasks = [PublishTask]wait_time = between(2, 2)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 从队列中获取客户端 username 和 client_idcurrent_client = client_queue.get()self.client = mqtt_client.Client(current_client[1])self.client.username_pw_set(current_client[0], PASSWORD)# self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模拟client连接报错# 定义一个容器,存放已发送的消息self.client.published_message = {}def on_start(self):# 设置tlscontext = ssl.SSLContext(ssl.PROTOCOL_TLS)self.client.tls_set_context(context)self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)self.client.on_connect = self.on_connectdef on_stop(self):print("publish 成功, 当前已成功发送数量:{}".format(total_published))if len(disconnect_record_list) == 0:print("无断开连接的client")else:# 把断开记录里的信息写入csvwith open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:writer = csv.writer(csvfile)writer.writerow(['client_id', 'rc_status', 'disconnected_time'])for i in disconnect_record_list:writer.writerow(i)print("断开连接的client信息已写入csv文件")@staticmethoddef on_connect(client, userdata, flags, rc, props=None):if rc == 0:print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))fire_success(request_type="c_success",name='count_connected',response_time=0,response_length=0,exception=None,context=None)else:print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))fire_success(request_type="c_fail",name="client_id: " + str(client._client_id),response_time=0,response_length=0,exception=None,context=None)if __name__ == '__main__':run_single_user(MQTTLocustUser)

2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承UserTaskSet即可。

User

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

  • tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。
  • wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

  • on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。

  • on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。

TaskSet

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

注意返回的2个属性:

  • mid: 返回这个消息发送的顺序
  • rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,密码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 连接代理
  • client.publish:向代理推送消息

还用到了一些回调函数:

  • on_connect:连接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

所以最后在控制台显示的报告里就有我定义的内容了。

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1076713.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

没更新的日子也在努力呀,布局2024!

文章目录 ⭐ 没更新的日子也在努力呀⭐ 近期的一个状态 - 已圆满⭐ 又到了2024的许愿时间了⭐ 开发者要如何去 "创富" ⭐ 没更新的日子也在努力呀 感觉很久没有更新视频了,好吧,其实真的很久没有更新短视频了。最近的一两个月真的太忙了&#…

MATLAB Coder从入门到放弃

一、MATLAB Coder入门 1 MATLAB Coder是什么 从 MATLAB 代码生成 C 和 C 代码 MATLAB Coder™ 可从 MATLAB 代码生成适用于各种硬件平台(从桌面计算机系统到嵌入式硬件)的 C 和 C 代码。它支持大多数 MATLAB 语言和广泛的工具箱。您可以将生成的代码作…

SHA-512在Go中的实战应用: 性能优化和安全最佳实践

SHA-512在Go中的实战应用: 性能优化和安全最佳实践 简介深入理解SHA-512算法SHA-512的工作原理安全性分析SHA-512与SHA-256的比较结论 实际案例分析数据完整性验证用户密码存储数字签名总结 性能优化技巧1. 利用并发处理2. 避免不必要的内存分配3. 适当的数据块大小总结 与其他…

【玩转408数据结构】线性表——线性表的顺序表示(顺序表)

知识回顾 通过前文,我们了解到线性表是具有相同数据类型的有限个数据元素序列;并且,线性表只是一种逻辑结构,其不同存储形式所展现出的也略有不同,那么今天我们来了解一下线性表的顺序存储——顺序表。 顺序表的定义 …

上个月刚跟男朋友一起买了个三百万的房子,准备明年结婚,这个月他突然被裁了...

职场变动,尤其是裁员,已经成为我们无法忽视的现实。不管你是互联网大佬,还是刚入行的新人,这个问题都可能突如其来,影响到你的生活和计划。 想象一下,你和你的另一半刚刚为了将来的幸福生活,拼尽…

使用 Windows 11/10 上的最佳 PDF 转 Word 转换器释放 PDF 的潜力

毫无疑问,PDF 是最好的文档格式之一,但就像其他格式一样,有时它们确实会带来一些限制。例如,在某些情况下,您可能想要将 PDF 转换为 Word。在这种情况下,您始终可以借助 PDF 到 Word 转换器的帮助。 为了说…

python - 模块使用详解

前言 Python有非常强大的第三方库,也有非常多的内置模块帮助开发人员实现某些功能,无需开发人员自己造轮子。本文介绍Python的模块。 什么是模块 模块简单来说就是一系列功能的集合体,如果将程序的开发比喻成拼图,模块就是各种…

12.atoi函数

文章目录 函数简介函数原型 代码运行 函数简介 函数原型 int atoi(char const *string);函数把字符转化为正数 代码运行 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h>int main() {int ret 0;char str[20] "112233";ret …

Day 43 | 动态规划 1049. 最后一块石头的重量 II 、494. 目标和 、 474.一和零

1049. 最后一块石头的重量 II 题目 文章讲解 视频讲解 思路&#xff1a;dp[j] 表示容量为 j 的背包&#xff0c;最多可以背最大重量为dp[j]。 class Solution {public int lastStoneWeightII(int[] stones) {int sum 0;for (int i 0; i < stones.length; i) {sum stone…

Django模型层part two - 多表关系创建和多表操作

前言 继续上面一篇文章的内容&#xff0c;本文介绍多表操作。使用django ORM可以创建多表关系&#xff0c;并且也支持多张表之间的操作&#xff0c;以创建表关系和查询两部分说明django ORM的多表操作。以作者、图书、出版社和作者信息几张表作为案例进行说明。 创建表关系 …

视觉slam十四讲学习笔记(三)李群与李代数

1. 理解李群与李代数的概念&#xff0c;掌握 SO(3), SE(3) 与对应李代数的表示方式。 2. 理解 BCH 近似的意义。 3. 学会在李代数上的扰动模型。 4. 使用 Sophus 对李代数进行运算。 目录 前言 一、李群李代数基础 1 群 2 李代数的引出 3 李代数的定义 4 李代数 so(3…

Docker笔记-搭建Python环境、安装依赖、打包镜像、导入镜像、编写bash脚本灵活调用

说明 适合无联网的机器及多Python的机器进行部署。 制作docker版Python环境 有网络及有docker的,拉取指定版本的python如: docker pull python:3.7 安装好后进入容器: docker run -it <name> /bin/bash 使用pip安装各种依赖: pip install <name> pip in…

Python访问数据库

目录 SQLite数据库 SQLite数据类型 Python数据类型与SQLite数据类型的映射 使用GUI管理工具管理SQLite数据库 数据库编程的基本操作过程 sqlite3模块API 数据库连接对象Connection 游标对象Cursor 数据库的CRUD操作示例 示例中的数据表 无条件查询 有条件查询 插入…

重学JavaScript高级(十二):async/await-事件循环-面试高频

async/await-事件循环 前面我们学习了生成器和迭代器&#xff0c;那么在本篇文章中&#xff0c;我们主要讲解生成器与Promise的结合使用&#xff0c;从而引出async/await语法&#xff0c;同时会涉及面试中频次最高的一个知识点&#xff1a;事件循环 生成器与异步处理 首先需要…

【Chrono Engine学习总结】4-vehicle-4.1-vehicle的基本概念

由于Chrono的官方教程在一些细节方面解释的并不清楚&#xff0c;自己做了一些尝试&#xff0c;做学习总结。 1、基本介绍 Vehicle Overview Vehicle Mannel Vehicle的官方demo 1.1 Vehicle的构型 一个车辆由许多子系统构成&#xff1a;悬挂、转向、轮子/履带、刹车/油门、动…

搜索专项---最短路模型

文章目录 迷宫问题武士风度的牛抓住那头牛 一、迷宫问题OJ链接 本题思路:只需要记录各个点是有哪个点走过来的&#xff0c;就能递推得出路径。记录前驱假设从 1,1 这个点向下走到了2, 1&#xff0c;则将2,1这个点的前驱记为1,1。这样&#xff0c;将整张地图 bfs 后&#xff0c…

C++进阶(十五)C++的类型转换

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、C语言中的类型转换二、为什么C需要四种类型转换三、C强制类型转换1、static_cast2、reint…

【必看】Onlyfans如何使用搜索功能?Onlyfans如何搜索博主?如何在OnlyFans搜索HongkongDoll

1. 什么是Onlyfans OnlyFans是一种内容订阅服务平台&#xff0c;它成立于2016年。 它允许内容创作者在平台上面分享自己的创作&#xff0c;如图片、视频等等&#xff0c;用户需要支付订阅费用才能查看创作者的内容。此外&#xff0c;用户还可以通过打赏的方式来让创作者为自己…

[Python进阶] 制作动态二维码

11.1 制作动态二维码 二维码&#xff08;QR code&#xff09;是一种二维条形码&#xff08;bar code&#xff09;&#xff0c;它的起源可以追溯到20世纪90年代初。当时&#xff0c;日本的汽车工业开始使用一种被称为QR码的二维条码来追踪汽车零部件的信息。 QR码是Quick Respo…

代码随想录算法训练营Day55|392.判断子序列、115.不同的子序列

目录 392.判断子序列 思路 ​算法实现 115.不同的子序列 思路 算法实现 总结 392.判断子序列 题目链接 文章链接 思路 利用动规五部曲进行分析&#xff1a; 1.确定dp数组及其下标含义&#xff1a; dp[i][j] 表示以下标i-1为结尾的字符串s&#xff0c;和以下标j-1为结尾的…