当前位置: 首页 > news >正文

33. 自动化测试开发之使用mysql异步连接池实现mysql数据库操作

Python自动化测试之数据库操作封装深度解析

一、核心类结构设计

1.1 DataBase基类实现

class DataBase:def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):# 参数存储self._args, self._kwargs = args, kwargsself._autocommit = autocommit  # ✅ 自动提交设置if database.lower() == 'mysql':self._database = create_pool  # 🎯 aiomysql异步连接池self._ini = INIReader(DATABASE_INI_PATH).data  # 📖 配置读取self._loop = asyncio.new_event_loop()  # 🔄 创建独立事件循环asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_pool  # ⚡ 立即初始化elif database.lower() == 'oracle':self._database = SessionPool  # 🗃️ cx_Oracle连接池self._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool  # ⚡ 立即初始化
参数配置说明表
参数类型默认值作用域
hoststrlocalhostMySQL服务器地址
portint3306服务端口
userstrroot数据库用户
maxsizeint20最大连接数
minsizeint5最小连接数

1.2 MysqlClient子类扩展

class MysqlClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls(*args, **kwargs)  # 🏭 工厂方法创建实例async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):async with self._mysql_pool.acquire() as conn:  # 🛡️ 连接自动管理async with conn.cursor(DictCursor) as cur:  # 📋 字典游标await cur.execute(sql.replace('?', '%s'), param)  # ⚠️ SQL参数化return await (cur.fetchmany(rows) if rows else cur.fetchall())def select(self, *args, **kwargs):return self._loop.run_until_complete(self._select(*args, **kwargs))  # 🔄 同步化执行async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql.replace('?', '%s'), param)return cur.rowcount  # 📊 返回影响行数def execute(self, *args, **kwargs):return self._loop.run_until_complete(self._execute(*args, **kwargs))

二、实战操作示例

2.1 数据库列表查询

# 初始化客户端
mysql = MysqlClient.setup()# 查询所有数据库
databases = mysql.select(r'SHOW DATABASES;', rows=None)
print("数据库列表:")
for db in databases:print(f" - {db['Database']}")# 示例输出:
"""
数据库列表:- information_schema- myemployees- mysql- performance_schema- sys
"""

2.2 条件查询演示

# 带参数查询职位信息
jobs = mysql.select(r'SELECT * FROM myemployees.jobs WHERE JOB_ID = ?',('AC_ACCOUNT',),rows=None
)print("职位详情:")
print(f"职位ID:{jobs[0]['JOB_ID']}")
print(f"职位名称:{jobs[0]['JOB_TITLE']}")
print(f"薪资范围:{jobs[0]['MIN_SALARY']}-{jobs[0]['MAX_SALARY']}")# 示例输出:
"""
职位详情:
职位ID:AC_ACCOUNT
职位名称:Public Accountant
薪资范围:4200-9000
"""

2.3 数据更新操作

# 更新职位名称
affected_rows = mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?',('高级会计师', 'AC_ACCOUNT')
)print(f"更新影响行数:{affected_rows}")# 验证更新结果
updated_job = mysql.select(r'SELECT JOB_TITLE FROM myemployees.jobs WHERE JOB_ID = ?',('AC_ACCOUNT',)
)
print(f"更新后职位名称:{updated_job[0]['JOB_TITLE']}")# 示例输出:
"""
更新影响行数:1
更新后职位名称:高级会计师
"""

三、企业级优化建议

3.1 现存问题清单

问题描述风险等级改进方案
SQL参数替换不安全使用原生参数化查询
缺乏事务管理机制添加事务上下文管理器
未处理连接超时添加connect_timeout参数
同步异步混合使用统一异步协程实现
未实现连接重试机制集成tenacity重试库

3.2 增强型实现方案

from contextlib import asynccontextmanager
from tenacity import retry, stop_after_attemptclass SafeMysqlClient(MysqlClient):@retry(stop=stop_after_attempt(3))async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, param)  # ✅ 使用原生参数化return cur.rowcount@asynccontextmanagerasync def transaction(self):async with self._mysql_pool.acquire() as conn:async with conn.begin():  # 🛡️ 事务管理yield conn# 使用示例
async def safe_update():async with SafeMysqlClient.setup().transaction() as conn:await conn.execute("UPDATE ...")mysql._loop.run_until_complete(safe_update())

3.3 最佳实践指南

某金融系统数据库操作规范

  1. 所有写操作必须使用事务
  2. 查询结果超过100条需分页处理
  3. 敏感字段查询必须记录审计日志
  4. 生产环境禁止使用字符串拼接SQL
  5. 执行时间超过1秒的操作需添加超时控制
# 安全查询示例
async def secure_query():async with mysql._mysql_pool.acquire() as conn:async with conn.cursor(DictCursor) as cur:await cur.execute("SELECT * FROM users WHERE id = %s",(user_id,),  # ✅ 原生参数化timeout=5.0  # ⏱️ 查询超时设置)return await cur.fetchall()

四、完整代码

"""
Python :3.13.3
Selenium: 4.31.0database.py
"""
import asyncio
from chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool, DictCursor
from cx_Oracle import SessionPool
from asyncio import ensure_futureclass DataBase:def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):self._args, self._kwargs = args, kwargsself._autocommit = autocommitif database.lower() == 'mysql':self._database = create_poolself._ini = INIReader(DATABASE_INI_PATH).dataself._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_poolif database.lower() == 'oracle':self._database = SessionPoolself._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool@propertydef oracle_pool(self):  # 建立Oracle连接池的方法return self._database(*self._args, **self._ini, **self._kwargs)@propertydef mysql_pool(self):  # 建立Mysql连接池的方法self._ini['autocommit'] = self._autocommitpool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))self._loop.run_until_complete(pool_task)return pool_task.result()class MysqlClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls(*args, **kwargs)async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):async with self._mysql_pool.acquire() as conn:async with conn.cursor(DictCursor) as cur:await cur.execute(sql.replace('?', '%s'), param)if rows:rs = await cur.fetchmany(rows)else:rs = await cur.fetchall()return rsdef select(self, *args, **kwargs):self._loop.run_until_complete(select_task := ensure_future(self._select(*args, **kwargs)))return select_task.result()async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql.replace('?', '%s'), param)return cur.rowcountdef execute(self, *args, **kwargs):self._loop.run_until_complete(execute_task := ensure_future(self._execute(*args, **kwargs)))return execute_task.result()mysql = MysqlClient.setup()
print(mysql.select(r'SHOW DATABASES;', (), rows=None))print(mysql.select(r'SELECT * FROM myemployees.jobs where JOB_ID=?', ('AC_ACCOUNT'), rows=None))
print(mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?', ('演示', 'AC_ACCOUNT')))

「小贴士」:点击头像→【关注】按钮,获取更多软件测试的晋升认知不迷路! 🚀

http://www.xdnf.cn/news/671653.html

相关文章:

  • 前端域名、端口、协议一样,本地缓存可以共享吗?
  • 【b站计算机拓荒者】【2025】微信小程序开发教程 - chapter3 项目实践 - 2信息采集
  • Protocol Buffers 复杂嵌套编译指南:生成 C++ 代码
  • JavaScript- 3.2 JavaScript实现不同显示器尺寸的响应式主题和页面
  • 开源酷炫大数据可视化大屏html+eacher 100+套
  • 力扣热题——分类求和并作差
  • Vue-02 (使用不同的 Vue CLI 插件)
  • 从 PyTorch 到 TensorFlow Lite:模型训练与推理
  • 【华为云物联网】iOtDA数据以表格字段转发OBS的设置攻略,便于以后数据上大屏
  • 如何描述BUG
  • VUE项目部署IIS服务器手册
  • 机器学习笔记【Week6】
  • 打板策略实战对比,khQuant回测横评第三弹【AI量化第29篇】
  • Nginx 在四大核心场景中的应用实践与优化
  • 深入解析 Flink 中的时间与窗口机制
  • 安卓证书的申请(保姆级图文)
  • Python服务器请求转发服务
  • KT6368A通过蓝牙芯片获取手机时间详细说明,对应串口指令举例
  • ubuntu中,c和c+程序,预编译、编译、链接和运行命令
  • 交换机 路由器
  • 2025 年江西研究生数学建模竞赛题A题电动汽车充电桩共享优化与电网安全协同模型完整思路 模型代码 结果 成品分享
  • 模板应用更新同步到所有开发中的应用
  • 思澈LCD-kit测试过程记录
  • 跳表(Skip List)查找算法详解
  • 2024年12月英语六级(第二套)真题+解析全24页
  • MySQL-5.7 修改密码和连接访问权限
  • 基于Python爬虫技术的对歌曲评论数据可视化分析系统
  • LabVIEW比例阀性能测试试验台
  • 【Python】日期计算和自动化运行脚本
  • 网站资源加载出现401错误