当前位置: 首页 > news >正文

34. 自动化测试开发之使用oracle连接池实现oracle数据库操作

Python自动化测试之Oracle数据库操作封装解析

一、OracleClient核心实现

1.1 类初始化与配置

class OracleClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls('oracle', *args, **kwargs)  # 🎯 强制指定数据库类型def select(self, sql: str, param: [list, None] = None, rows: [int, None] = 1, **kwargs):# 🚫 参数互斥检查if param and kwargs:raise Exception(f'参数类型冲突: {param}, {kwargs}')# 🛠️ 连接管理with self._oracle_pool.acquire() as conn:with conn.cursor() as cur:cur.execute(sql, (param or kwargs))  # ⚠️ 参数绑定方式# 📊 结果集转字典columns = [col[0] for col in cur.description]  # ✅ 获取字段名cur.rowfactory = lambda *args: dict(zip(columns, args))# 🎯 获取指定行数if rows:rs = cur.fetchone() if rows == 1 else cur.fetchmany(rows)else:rs = cur.fetchall()return rs
参数说明表
参数类型必需性作用描述
sqlstrSQL查询语句
paramlist/tuple位置参数列表
kwargsdict命名参数键值对
rowsint/None返回行数限制

1.2 数据操作实现

def execute(self, sql: str, param: List[tuple], **kwargs):with self._oracle_pool.acquire() as conn:with conn.cursor() as cur:# 📦 批量操作支持if param:cur.executemany(sql, param)  # 🚀 批量执行else:cur.execute(sql, **kwargs)  # ⚡ 单条执行rowcount = cur.rowcount  # 📊 影响行数conn.commit()  # ✅ 显式提交事务return rowcount
方法特性对比
特性select方法execute方法
返回类型字典列表影响行数
事务管理自动提交显式提交
参数绑定方式位置/命名参数批量/单条
结果集处理自动转字典

二、实战操作示例

2.1 查询操作演示

# 位置参数查询
result1 = oracle.select(r'SELECT * FROM employees WHERE id = :1 AND name = :2',[101, 'Alice'],rows=1
)# 命名参数查询
result2 = oracle.select(r'SELECT * FROM departments WHERE dept_id = :id',rows=None,id='D001'
)print(f"员工信息:{result1}")
print(f"部门信息:{result2}")
示例输出
员工信息:{'ID': 101, 'NAME': 'Alice', 'SALARY': 8500}
部门信息:[{'DEPT_ID':'D001', 'NAME':'研发部'}, {...}]

2.2 数据更新操作

# 批量更新
batch_params = [('高级工程师', 'E101'),('资深经理', 'M202')
]
affected = oracle.execute(r'UPDATE positions SET title = :1 WHERE emp_id = :2',param=batch_params
)# 单条更新
single_affected = oracle.execute(r'UPDATE salaries SET amount = :amount WHERE emp_id = :id',amount=12000,id='E101'
)print(f"批量更新影响行数:{affected}")
print(f"单条更新影响行数:{single_affected}")
示例输出
批量更新影响行数:2
单条更新影响行数:1

三、代码优化建议

3.1 现存问题清单

问题描述风险等级改进方案
参数绑定语法错误使用正确命名绑定语法
缺乏事务回滚机制添加try/except回滚逻辑
结果集转换性能问题使用高效字典生成方式
未处理空结果集情况添加空值判断逻辑

3.2 增强型实现

from contextlib import contextmanagerclass SafeOracleClient(OracleClient):@contextmanagerdef transaction(self):conn = self._oracle_pool.acquire()try:yield connconn.commit()  # ✅ 成功提交except Exception as e:conn.rollback()  # 🚨 异常回滚raisefinally:conn.close()def select(self, sql, **kwargs):# 🛡️ 安全参数处理sanitized = {k: v for k, v in kwargs.items() if not isinstance(v, str)}return super().select(sql, **sanitized)# 使用示例
with SafeOracleClient.setup().transaction() as conn:conn.execute("...")

四、企业级最佳实践

4.1 某金融系统Oracle操作规范

  1. 参数化查询:禁止字符串拼接SQL
  2. 连接管理:单个事务时间不超过5秒
  3. 批量操作:每次最多1000条记录
  4. 审计日志:记录所有数据变更操作
  5. 性能规范:查询结果超过1万行需分页
# 合规查询示例
def get_paginated_data(page=1, size=100):offset = (page-1)*sizereturn oracle.select("SELECT * FROM transactions ORDER BY id ""OFFSET :offset ROWS FETCH NEXT :size ROWS ONLY",offset=offset,size=size)

4.2 性能优化方案

# 使用预编译语句
from cx_Oracle import Cursorclass OptimizedOracleClient(OracleClient):def __init__(self):self._prepared = {}  # 📦 缓存预编译语句def select(self, sql, **kwargs):if sql not in self._prepared:stmt = self._oracle_pool.prepare(sql)self._prepared[sql] = stmtreturn self._prepared[sql].execute(**kwargs).fetchall()# 使用示例
optimized = OptimizedOracleClient.setup()
result = optimized.select("SELECT * FROM products WHERE category=:cat", cat='ELECTRONIC')

五、完整代码

"""
Python :3.13.3
Selenium: 4.31.0database.py
"""
import asynciofrom chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool, DictCursor
from cx_Oracle import SessionPool
from asyncio import ensure_future
from typing import Listclass DataBase:def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):self._args, self._kwargs = args, kwargsself._autocommit = autocommitif database.lower() == 'mysql':self._database = create_poolself._ini = INIReader(DATABASE_INI_PATH).dataself._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_poolif database.lower() == 'oracle':self._database = SessionPoolself._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool@propertydef oracle_pool(self):  # 建立Oracle连接池的方法return self._database(*self._args, **self._ini, **self._kwargs)@propertydef mysql_pool(self):  # 建立Mysql连接池的方法self._ini['autocommit'] = self._autocommitpool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))self._loop.run_until_complete(pool_task)return pool_task.result()class MysqlClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls(*args, **kwargs)async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):async with self._mysql_pool.acquire() as conn:async with conn.cursor(DictCursor) as cur:await cur.execute(sql.replace('?', '%s'), param)if rows:rs = await cur.fetchmany(rows)else:rs = await cur.fetchall()return rsdef select(self, *args, **kwargs):self._loop.run_until_complete(select_task := ensure_future(self._select(*args, **kwargs)))return select_task.result()async def _execute(self, sql: str, param: tuple = ()):async with self._mysql_pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql.replace('?', '%s'), param)return cur.rowcountdef execute(self, *args, **kwargs):self._loop.run_until_complete(execute_task := ensure_future(self._execute(*args, **kwargs)))return execute_task.result()# mysql = MysqlClient.setup()
# print(mysql.select(r'SHOW DATABASES;', (), rows=None))
# print(mysql.select(r'SELECT * FROM myemployees.jobs where JOB_ID=?', ('AC_ACCOUNT'), rows=None))
# print(mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?', ('演示', 'AC_ACCOUNT')))class OracleClient(DataBase):@classmethoddef setup(cls, *args, **kwargs):return cls('oracle', *args, **kwargs)def select(self, sql: str, param: [list, None] = None, rows: [int, None] = 1, **kwargs):if param and kwargs:raise Exception(f'两种参数类型不能同时传入:{param}, {kwargs}')with self._oracle_pool.acquire() as conn:with conn.cursor() as cur:cur.execute(sql, (param or kwargs))columns = [col[0] for col in cur.desciption]cur.rowfactory = lambda *args: dict(zip(columns, args))if rows:if rows == 1:rs = cur.fetchone()else:rs = cur.fetchmany(rows)else:rs = cur.fetchall()return rsdef execute(self, sql: str, param: List[tuple], **kwargs):with self._oracle_pool.acquire() as conn:with conn.cursor() as cur:if param:cur.executemany(sql, param)else:cur.execute(sql, **kwargs)rowcount = cur.rowcountconn.commit()return rowcountoracle = OracleClient.setup()
oracle.select(r'SELECT * FROM TABLEA WHERE ID = :ID AND NAME = :SAM', [1, 'SAM'], 1)
oracle.select(r'SELECT * FROM TABLEA WHERE ID = :ID AND NAME = :SAM', rows=1, ID=1, SAM='SAM')
oracle.execute(r'UPDATE DEMO_TABLE SET NAME = :SAM WHERE ID = :ID',[('SAM', 1), ('TOM', 2)])
oracle.execute(r'UPDATE DEMO_TABLE SET NAME = :SAM WHERE ID = :ID', param=[], SAM='SAM', ID=1)

「小贴士」:点击头像→【关注】按钮,获取更多软件测试的晋升认知不迷路! 🚀

http://www.xdnf.cn/news/666613.html

相关文章:

  • 鸿蒙OSUniApp 开发的商品筛选器与排序功能#三方框架 #Uniapp
  • mediapipe标注视频姿态关键点
  • LVS 负载均衡群集
  • Reactor和Proactor
  • Docker部署Spark大数据组件
  • 【Java项目实战】智能截图工具V2.0:集成Tesseract OCR实现中英文识别功能完整开发教程
  • 【Qt开发】多元素控件
  • JavaScript性能优化全景指南
  • QT 框架学习笔记
  • Elasticsearch性能优化全解析
  • uni-app(6):Vue3语法基础下
  • Nginx 性能优化全解析:从进程到安全的深度实践
  • 【JavaScript 性能优化方法】
  • 【前端】【Vue3】vue3性能优化总结
  • MySQL 窗口函数深度解析:语法、应用场景与性能优化
  • day 23 机器学习管道(pipeline)
  • 项目启动以及Vue初识
  • Python整合Milvus向量数据库案例实战
  • 通过HIVE SQL获取每个用户的最大连续登录时常
  • 【Opencv+Yolo】Day2_图像处理
  • Vim 常用命令
  • 《数据结构初阶》【番外篇:快速排序的前世今生】
  • MySQL的主从复制
  • MYSQL 学习笔记
  • Django ToDoWeb 服务
  • 4.8.5 利用Spark SQL统计网站每月访问量
  • sharding jdbc的使用,如何在Spring中实现数据库的主从分离、分库分表等功能
  • Java· swing 小demo
  • EasyDarwin的配置与使用
  • MMAction2重要的几个配置参数