# 位置参数查询
result1 = oracle.select(r'SELECT * FROM employees WHERE id = :1 AND name = :2',[101,'Alice'],rows=1)# 命名参数查询
result2 = oracle.select(r'SELECT * FROM departments WHERE dept_id = :id',rows=None,id='D001')print(f"员工信息:{result1}")print(f"部门信息:{result2}")
# 批量更新
batch_params =[('高级工程师','E101'),('资深经理','M202')]
affected = oracle.execute(r'UPDATE positions SET title = :1 WHERE emp_id = :2',param=batch_params
)# 单条更新
single_affected = oracle.execute(r'UPDATE salaries SET amount = :amount WHERE emp_id = :id',amount=12000,id='E101')print(f"批量更新影响行数:{affected}")print(f"单条更新影响行数:{single_affected}")
示例输出
批量更新影响行数:2
单条更新影响行数:1
三、代码优化建议
3.1 现存问题清单
问题描述
风险等级
改进方案
参数绑定语法错误
高
使用正确命名绑定语法
缺乏事务回滚机制
高
添加try/except回滚逻辑
结果集转换性能问题
中
使用高效字典生成方式
未处理空结果集情况
低
添加空值判断逻辑
3.2 增强型实现
from contextlib import contextmanagerclassSafeOracleClient(OracleClient):@contextmanagerdeftransaction(self):conn = self._oracle_pool.acquire()try:yield connconn.commit()# ✅ 成功提交except Exception as e:conn.rollback()# 🚨 异常回滚raisefinally:conn.close()defselect(self, sql,**kwargs):# 🛡️ 安全参数处理sanitized ={k: v for k, v in kwargs.items()ifnotisinstance(v,str)}returnsuper().select(sql,**sanitized)# 使用示例with SafeOracleClient.setup().transaction()as conn:conn.execute("...")
四、企业级最佳实践
4.1 某金融系统Oracle操作规范
参数化查询:禁止字符串拼接SQL
连接管理:单个事务时间不超过5秒
批量操作:每次最多1000条记录
审计日志:记录所有数据变更操作
性能规范:查询结果超过1万行需分页
# 合规查询示例defget_paginated_data(page=1, size=100):offset =(page-1)*sizereturn oracle.select("SELECT * FROM transactions ORDER BY id ""OFFSET :offset ROWS FETCH NEXT :size ROWS ONLY",offset=offset,size=size)
4.2 性能优化方案
# 使用预编译语句from cx_Oracle import CursorclassOptimizedOracleClient(OracleClient):def__init__(self):self._prepared ={}# 📦 缓存预编译语句defselect(self, sql,**kwargs):if sql notin self._prepared:stmt = self._oracle_pool.prepare(sql)self._prepared[sql]= stmtreturn self._prepared[sql].execute(**kwargs).fetchall()# 使用示例
optimized = OptimizedOracleClient.setup()
result = optimized.select("SELECT * FROM products WHERE category=:cat", cat='ELECTRONIC')
五、完整代码
"""
Python :3.13.3
Selenium: 4.31.0database.py
"""import asynciofrom chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool, DictCursor
from cx_Oracle import SessionPool
from asyncio import ensure_future
from typing import ListclassDataBase:def__init__(self, database:str='mysql', autocommit:bool=True,*args,**kwargs):self._args, self._kwargs = args, kwargsself._autocommit = autocommitif database.lower()=='mysql':self._database = create_poolself._ini = INIReader(DATABASE_INI_PATH).dataself._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)self._mysql_pool = self.mysql_poolif database.lower()=='oracle':self._database = SessionPoolself._ini = INIReader(DATABASE_INI_PATH, section='oracle').dataself._oracle_pool = self.oracle_pool@propertydeforacle_pool(self):# 建立Oracle连接池的方法return self._database(*self._args,**self._ini,**self._kwargs)@propertydefmysql_pool(self):# 建立Mysql连接池的方法self._ini['autocommit']= self._autocommitpool_task = ensure_future(self._database(*self._args,**self._ini,**self._kwargs))self._loop.run_until_complete(pool_task)return pool_task.result()classMysqlClient(DataBase):@classmethoddefsetup(cls,*args,**kwargs):return cls(*args,**kwargs)asyncdef_select(self, sql:str, param:tuple=(), rows:[int,None]=1):asyncwith self._mysql_pool.acquire()as conn:asyncwith conn.cursor(DictCursor)as cur:await cur.execute(sql.replace('?','%s'), param)if rows:rs =await cur.fetchmany(rows)else:rs =await cur.fetchall()return rsdefselect(self,*args,**kwargs):self._loop.run_until_complete(select_task := ensure_future(self._select(*args,**kwargs)))return select_task.result()asyncdef_execute(self, sql:str, param:tuple=()):asyncwith self._mysql_pool.acquire()as conn:asyncwith conn.cursor()as cur:await cur.execute(sql.replace('?','%s'), param)return cur.rowcountdefexecute(self,*args,**kwargs):self._loop.run_until_complete(execute_task := ensure_future(self._execute(*args,**kwargs)))return execute_task.result()# mysql = MysqlClient.setup()# print(mysql.select(r'SHOW DATABASES;', (), rows=None))# print(mysql.select(r'SELECT * FROM myemployees.jobs where JOB_ID=?', ('AC_ACCOUNT'), rows=None))# print(mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?', ('演示', 'AC_ACCOUNT')))classOracleClient(DataBase):@classmethoddefsetup(cls,*args,**kwargs):return cls('oracle',*args,**kwargs)defselect(self, sql:str, param:[list,None]=None, rows:[int,None]=1,**kwargs):if param and kwargs:raise Exception(f'两种参数类型不能同时传入:{param}, {kwargs}')with self._oracle_pool.acquire()as conn:with conn.cursor()as cur:cur.execute(sql,(param or kwargs))columns =[col[0]for col in cur.desciption]cur.rowfactory =lambda*args:dict(zip(columns, args))if rows:if rows ==1:rs = cur.fetchone()else:rs = cur.fetchmany(rows)else:rs = cur.fetchall()return rsdefexecute(self, sql:str, param: List[tuple],**kwargs):with self._oracle_pool.acquire()as conn:with conn.cursor()as cur:if param:cur.executemany(sql, param)else:cur.execute(sql,**kwargs)rowcount = cur.rowcountconn.commit()return rowcountoracle = OracleClient.setup()
oracle.select(r'SELECT * FROM TABLEA WHERE ID = :ID AND NAME = :SAM',[1,'SAM'],1)
oracle.select(r'SELECT * FROM TABLEA WHERE ID = :ID AND NAME = :SAM', rows=1, ID=1, SAM='SAM')
oracle.execute(r'UPDATE DEMO_TABLE SET NAME = :SAM WHERE ID = :ID',[('SAM',1),('TOM',2)])
oracle.execute(r'UPDATE DEMO_TABLE SET NAME = :SAM WHERE ID = :ID', param=[], SAM='SAM', ID=1)