32. 自动化测试开发之建立mysql和oracle数据库连接池
自动化测试开发之数据库连接池实现解析
一、核心类结构解析
1.1 类初始化方法
class DataBase:def __init__(self, database='mysql', autocommit=True, *args, **kwargs):# 基础参数存储self._args, self._kwargs = args, kwargsself._autocommit = autocommit # ✅ 自动提交设置# 数据库类型判断if database.lower() == 'mysql':self._database = create_pool # 🎯 异步连接池创建方法self._ini = INIReader(DATABASE_INI_PATH).data # 📖 读取配置self._loop = get_event_loop() # 🔄 获取事件循环self._mysql_pool = self.mysql_pool # ⚡ 立即初始化连接池elif database.lower() == 'oracle':self._database = SessionPool # 🗃️ Oracle连接池类self._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool # ⚡ 立即初始化
参数说明表
参数 | 类型 | 默认值 | 作用描述 |
---|---|---|---|
database | str | ‘mysql’ | 数据库类型(mysql/oracle) |
autocommit | bool | True | 是否自动提交事务 |
*args | - | - | 透传给连接池的额外位置参数 |
**kwargs | - | - | 透传给连接池的额外关键字参数 |
1.2 连接池属性方法
@property
def oracle_pool(self):# 🚀 创建Oracle连接池return self._database(*self._args, **self._ini, **self._kwargs)@property
def mysql_pool(self):# ⚡ 异步创建MySQL连接池self._ini['autocommit'] = self._autocommit # 🔄 注入自动提交配置pool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))self._loop.run_until_complete(pool_task) # 🛠️ 同步等待异步任务return pool_task.result() # 🎉 返回连接池实例
二、实战使用示例
2.1 配置文件示例(database.ini)
[mysql]
host = localhost
port = 3306
user = tester
password = Test@1234
db = testdb
maxsize = 20
minsize = 5[oracle]
user = system
password = Oracle123
dsn = 10.0.0.1:1521/ORCL
min = 5
max = 20
increment = 1
2.2 MySQL连接测试
# 实例化MySQL连接池
mysql_db = DataBase(database='mysql')# 执行异步查询
async def query_data():async with mysql_db._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute("SELECT version()")return await cur.fetchone()result = mysql_db._loop.run_until_complete(query_data())
print(f"MySQL版本: {result[0]}")
输出示例
MySQL版本: 8.0.26
2.3 Oracle连接测试
# 实例化Oracle连接池
oracle_db = DataBase(database='oracle')# 执行同步查询
with oracle_db._oracle_pool.acquire() as conn:cursor = conn.cursor()cursor.execute("SELECT * FROM dual")print(cursor.fetchone())
输出示例
('X',)
三、企业级优化建议
3.1 现存问题清单
问题描述 | 风险等级 | 改进方案 |
---|---|---|
混合异步/同步实现 | 高 | 统一使用异步模式 |
缺乏连接泄漏检测机制 | 高 | 添加连接状态监控 |
未处理连接失败重试 | 中 | 实现指数退避重试策略 |
未验证连接池有效性 | 中 | 添加心跳检测机制 |
3.2 增强型连接池实现
from backoff import expo, on_exceptionclass SafeDataBase(DataBase):@on_exception(expo, Exception, max_tries=3)def mysql_pool(self):return super().mysql_pool@propertydef oracle_pool(self):pool = super().oracle_pool# 🫀 心跳检测with pool.acquire() as conn:conn.ping()return pool# 使用示例
safe_db = SafeDataBase('mysql')
3.3 最佳实践指南
某金融企业数据库访问规范:
- 生产环境必须使用加密连接(SSL/TLS)
- 连接池最大尺寸不超过50
- 每个SQL操作必须设置超时时间
- 敏感信息必须通过Vault管理
- 所有访问操作必须记录审计日志
# 安全连接示例
ssl_ctx = ssl.create_default_context(cafile="ca.pem")
ssl_ctx.verify_mode = ssl.CERT_REQUIREDsecure_db = DataBase(database='mysql',ssl=ssl_ctx,connect_timeout=5,read_timeout=30
)
四、完整代码
from chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool
from cx_Oracle import SessionPool
from asyncio import get_event_loop, ensure_futureclass DataBase:def __init__(self, database:str='mysql', autocommit:bool=True, *args, **kwargs):self._args, self._kwargs = args, kwargsself._autocommit = autocommitif database.lower()=='mysql':self._database = create_poolself._ini = INIReader(DATABASE_INI_PATH).dataself._loop = get_event_loop()self._mysql_pool = self.mysql_poolif database.lower()=='oracle':self._database = SessionPoolself._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool@propertydef oracle_pool(self):# 建立Oracle连接池的方法return self._database(*self._args, **self._ini, **self._kwargs)@propertydef mysql_pool(self):# 建立Mysql连接池的方法self._ini['autocommit'] = self._autocommitpool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))self._loop.run_until_complete(pool_task)return pool_task.result()
「小贴士」:点击头像→【关注】按钮,获取更多软件测试的晋升认知不迷路! 🚀