33. 自动化测试开发之使用mysql异步连接池实现mysql数据库操作
Python自动化测试之数据库操作封装深度解析
一、核心类结构设计
1.1 DataBase基类实现
class DataBase:def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):# 参数存储self._args, self._kwargs = args, kwargsself._autocommit = autocommit # ✅ 自动提交设置if database.lower() == 'mysql':self._database = create_pool # 🎯 aiomysql异步连接池self._ini = INIReader(DATABASE_INI_PATH).data # 📖 配置读取self._loop = asyncio.new_event_loop() # 🔄 创建独立事件循环asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_pool # ⚡ 立即初始化elif database.lower() == 'oracle':self._database = SessionPool # 🗃️ cx_Oracle连接池self._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool # ⚡ 立即初始化
参数配置说明表
参数 | 类型 | 默认值 | 作用域 |
---|---|---|---|
host | str | localhost | MySQL服务器地址 |
port | int | 3306 | 服务端口 |
user | str | root | 数据库用户 |
maxsize | int | 20 | 最大连接数 |
minsize | int | 5 | 最小连接数 |
1.2 MysqlClient子类扩展
class MysqlClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls(*args, **kwargs) # 🏭 工厂方法创建实例async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):async with self._mysql_pool.acquire() as conn: # 🛡️ 连接自动管理async with conn.cursor(DictCursor) as cur: # 📋 字典游标await cur.execute(sql.replace('?', '%s'), param) # ⚠️ SQL参数化return await (cur.fetchmany(rows) if rows else cur.fetchall())def select(self, *args, **kwargs):return self._loop.run_until_complete(self._select(*args, **kwargs)) # 🔄 同步化执行async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql.replace('?', '%s'), param)return cur.rowcount # 📊 返回影响行数def execute(self, *args, **kwargs):return self._loop.run_until_complete(self._execute(*args, **kwargs))
二、实战操作示例
2.1 数据库列表查询
# 初始化客户端
mysql = MysqlClient.setup()# 查询所有数据库
databases = mysql.select(r'SHOW DATABASES;', rows=None)
print("数据库列表:")
for db in databases:print(f" - {db['Database']}")# 示例输出:
"""
数据库列表:- information_schema- myemployees- mysql- performance_schema- sys
"""
2.2 条件查询演示
# 带参数查询职位信息
jobs = mysql.select(r'SELECT * FROM myemployees.jobs WHERE JOB_ID = ?',('AC_ACCOUNT',),rows=None
)print("职位详情:")
print(f"职位ID:{jobs[0]['JOB_ID']}")
print(f"职位名称:{jobs[0]['JOB_TITLE']}")
print(f"薪资范围:{jobs[0]['MIN_SALARY']}-{jobs[0]['MAX_SALARY']}")# 示例输出:
"""
职位详情:
职位ID:AC_ACCOUNT
职位名称:Public Accountant
薪资范围:4200-9000
"""
2.3 数据更新操作
# 更新职位名称
affected_rows = mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?',('高级会计师', 'AC_ACCOUNT')
)print(f"更新影响行数:{affected_rows}")# 验证更新结果
updated_job = mysql.select(r'SELECT JOB_TITLE FROM myemployees.jobs WHERE JOB_ID = ?',('AC_ACCOUNT',)
)
print(f"更新后职位名称:{updated_job[0]['JOB_TITLE']}")# 示例输出:
"""
更新影响行数:1
更新后职位名称:高级会计师
"""
三、企业级优化建议
3.1 现存问题清单
问题描述 | 风险等级 | 改进方案 |
---|---|---|
SQL参数替换不安全 | 高 | 使用原生参数化查询 |
缺乏事务管理机制 | 高 | 添加事务上下文管理器 |
未处理连接超时 | 中 | 添加connect_timeout参数 |
同步异步混合使用 | 中 | 统一异步协程实现 |
未实现连接重试机制 | 低 | 集成tenacity重试库 |
3.2 增强型实现方案
from contextlib import asynccontextmanager
from tenacity import retry, stop_after_attemptclass SafeMysqlClient(MysqlClient):@retry(stop=stop_after_attempt(3))async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, param) # ✅ 使用原生参数化return cur.rowcount@asynccontextmanagerasync def transaction(self):async with self._mysql_pool.acquire() as conn:async with conn.begin(): # 🛡️ 事务管理yield conn# 使用示例
async def safe_update():async with SafeMysqlClient.setup().transaction() as conn:await conn.execute("UPDATE ...")mysql._loop.run_until_complete(safe_update())
3.3 最佳实践指南
某金融系统数据库操作规范:
- 所有写操作必须使用事务
- 查询结果超过100条需分页处理
- 敏感字段查询必须记录审计日志
- 生产环境禁止使用字符串拼接SQL
- 执行时间超过1秒的操作需添加超时控制
# 安全查询示例
async def secure_query():async with mysql._mysql_pool.acquire() as conn:async with conn.cursor(DictCursor) as cur:await cur.execute("SELECT * FROM users WHERE id = %s",(user_id,), # ✅ 原生参数化timeout=5.0 # ⏱️ 查询超时设置)return await cur.fetchall()
四、完整代码
"""
Python :3.13.3
Selenium: 4.31.0database.py
"""
import asyncio
from chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool, DictCursor
from cx_Oracle import SessionPool
from asyncio import ensure_futureclass DataBase:def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):self._args, self._kwargs = args, kwargsself._autocommit = autocommitif database.lower() == 'mysql':self._database = create_poolself._ini = INIReader(DATABASE_INI_PATH).dataself._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_poolif database.lower() == 'oracle':self._database = SessionPoolself._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool@propertydef oracle_pool(self): # 建立Oracle连接池的方法return self._database(*self._args, **self._ini, **self._kwargs)@propertydef mysql_pool(self): # 建立Mysql连接池的方法self._ini['autocommit'] = self._autocommitpool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))self._loop.run_until_complete(pool_task)return pool_task.result()class MysqlClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls(*args, **kwargs)async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):async with self._mysql_pool.acquire() as conn:async with conn.cursor(DictCursor) as cur:await cur.execute(sql.replace('?', '%s'), param)if rows:rs = await cur.fetchmany(rows)else:rs = await cur.fetchall()return rsdef select(self, *args, **kwargs):self._loop.run_until_complete(select_task := ensure_future(self._select(*args, **kwargs)))return select_task.result()async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql.replace('?', '%s'), param)return cur.rowcountdef execute(self, *args, **kwargs):self._loop.run_until_complete(execute_task := ensure_future(self._execute(*args, **kwargs)))return execute_task.result()mysql = MysqlClient.setup()
print(mysql.select(r'SHOW DATABASES;', (), rows=None))print(mysql.select(r'SELECT * FROM myemployees.jobs where JOB_ID=?', ('AC_ACCOUNT'), rows=None))
print(mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?', ('演示', 'AC_ACCOUNT')))
「小贴士」:点击头像→【关注】按钮,获取更多软件测试的晋升认知不迷路! 🚀