pymongo配置事务环境并封装事务功能
数据库事务我们或多或少都听说过,它是解决数据一致性、保持数据完整性的常用方案。Mongodb在2018年6月发布的4.0版本中开始引入事务功能,支持多文档的ACID特性,但Mongodb的事务功能只允许在副本集群成员或mongos环境中使用。
一、解决事务异常
异常信息:Transaction numbers are only allowed on a replica set member or mongos
异常相关资料:mongodb-community、mongodb-transactions-error
本机安装Mongodb之后保持默认配置,那就是单机模式。单机模式的Mongodb不支持事务,直接使用会报如下图所示的错误,为了解决图中的问题我们需要配置Mongodb环境。
二、配置Mongodb环境
1、关闭服务
- 连接服务:mongodb://localhost:27017/
- 切换到admin:use admin
- 关闭服务:db.adminCommand({ shutdown:1, comment:"Convert to cluster" })
2、修改Mongodb的配置 
3、初始化集群
- 连接服务:mongodb://localhost:27017/
- 初始化:rs.initiate()
- 查看集群配置:rs.conf()
4、往集群中添加节点
- 添加节点:rs.add()
- 如果除本机外没有其他节点就不用添加了,我的本地集群环境就只有一个节点
- 切换成集群模式后,再重新连接服务,就会提示当前处在哪个节点(rs0),以及是否为主节点(primary)
三、定义事务装饰器
- 如果大家对python装饰器有疑问或感兴趣请移步:python装饰器
- python事务和sql事务类似,都是通过一个session来控制一组数据库操作的提交、回滚
- 重点是要处理好提交、回滚的时机
from typing import Any
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.client_session import _TxnState, ClientSession
from dmspycommon.models.mongo.mongo_settings import MongoSettingsclass TransactionManager:def __init__(self, mongo_settings: MongoSettings):self.client = MongoClient(mongo_settings.connection_string)self.session: ClientSession = Nonedef with_transaction(self, func, commit_transaction: bool = False):def wrapper(*args, **kwargs) -> Any:# Init ClientSessionif self.session == None:self.session = self.client.start_session()try:# Start transactionif not self.session.in_transaction:self.session.start_transaction(read_concern=ReadConcern('majority'), write_concern=WriteConcern("majority"))try:# Execute raw functionresult = func(*args, **kwargs)# Commit transactionif (commit_transaction == True and self.transaction_is_running()):self.session.commit_transaction()return resultexcept PyMongoError as err:# Rollback transactionif (self.transaction_is_running()):self.session.abort_transaction()raise errexcept Exception as ex:# Rollback transactionif (self.transaction_is_running()):self.session.abort_transaction()raise exfinally:# End sessionif (self.session._transaction.state == _TxnState.COMMITTED or self.session._transaction.state == _TxnState.ABORTED):self.session.end_session()self.session = Nonereturn wrapperdef transaction_is_running(self):return self.session != None and (self.session._transaction.state == _TxnState.STARTING or self.session._transaction.state == _TxnState.IN_PROGRESS)
四、改造Service来支持事务
第一个改动点是要把TransactionManager对象(内含事务装饰器)传到具体的服务中,如果多个服务的DB操作需要处在一个事务中那初始化时就用同一个TransactionManager对象,这个在测试事务功能时会有体现;
第二个改动点是DB操作时对于支持session参数的接口,必须要把session参数值传进去,这个在下面的两个类中会有体现;
import uuid
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.client_session import ClientSession
from dmspycommon.utils.mongo.trans.base_trans_service import BaseTransService
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.utils.property_util import get_attr_value, set_attr_valueclass TransactionClassService(BaseTransService):def __init__(self, transaction_manager: TransactionManager = None):super().__init__(transaction_manager)#self.transaction_manager: TransactionManager = transaction_managerself.db_name: str = "keai-well"self.client: MongoClient = self.transaction_manager.clientself.session: ClientSession = self.transaction_manager.sessionself.db: Database = self.client[self.db_name]def insert_data(self, data, collection_name: str, tenant_id: str = None):"""插入数据的示例方法:param data: 待插入的数据:param collection_name: 集合名:param tenant_id: 租户Id:return: 插入结果"""if (data['age']<60):raise ValueError(f'Age is less than 60')collection = self.db[collection_name]result = collection.insert_one(data, session=self.get_session())return result.inserted_iddef insert_main_data(self, data ,tenant_id: str = None):if (data['age']<60):raise ValueError(f'Age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['main']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_main_data, id = {inserted_id}')self.insert_sub_data(inserted_id, tenant_id)self.insert_detail_data(inserted_id, tenant_id)return inserted_id def insert_sub_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'address':'Alice', 'subject':'Chinese', 'score':72}if (data['score']<60):raise ValueError(f'score is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['sub']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_sub_data, id = {inserted_id}')def insert_detail_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'company':'Google', 'father':'Guoguo', 'age':82}if (data['age']<60):raise ValueError(f'age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['detail']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_detail_data, id = {inserted_id}')if __name__ == '__main__':tran_svc = TransactionClassService("keai-well")data = {'name':'Alice', 'age':62}#insert_result = tran_svc.insert_main_data(data=data, commit_tran=CommitTran.Yes)insert_main_data = tran_svc.transaction_manager.with_transaction(tran_svc.insert_main_data, True)insert_main_data(data=data)
import uuid
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.client_session import ClientSession
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.utils.property_util import get_attr_value, set_attr_valueclass TransactionSecondClassService:def __init__(self, transaction_manager: TransactionManager = None):self.transaction_manager: TransactionManager = transaction_managerself.db_name: str = "keai-well"self.client: MongoClient = self.transaction_manager.clientself.session: ClientSession = self.transaction_manager.sessionself.db: Database = self.client[self.db_name]def insert_secondmain_data(self, data ,tenant_id: str = None):if (data['age']<60):raise ValueError(f'Age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['secondmain']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_secondmain_data, id = {inserted_id}')self.insert_secondsub_data(inserted_id, tenant_id)self.insert_seconddetail_data(inserted_id, tenant_id)return inserted_id def insert_secondsub_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'address':'Alice', 'subject':'Chinese', 'score':72}if (data['score']<60):raise ValueError(f'score is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['secondsub']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_secondsub_data, id = {inserted_id}')def insert_seconddetail_data(self, main_id, tenant_id: str = None):#把'age':82改成52,程序会抛异常导致DB持久化失败,由于我们使用了事务,所以最终DB一条数据也没插入成功data = {'mainId':main_id, 'company':'Google', 'father':'Guoguo', 'age':82}if (data['age']<60):raise ValueError(f'age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['seconddetail']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_seconddetail_data, id = {inserted_id}')def __get_session(self):return self.transaction_manager.session if self.transaction_manager is not None else None
五、进一步封装
细心的朋友可能发现了第四步中 TransactionClassService 继承了父类 BaseTransService,我其实是把跟事务相关的共用对象和共性操作封装到了 BaseTransService 中,这样能减少重复代码。至于 TransactionSecondClassService 没继承父类,是为了留下跟 TransactionClassService 做个对比,如果大家在实际使用过程中有更好的思路欢迎在评论区留言讨论。
from dmspycommon.utils.mongo.transaction_manager import TransactionManagerclass BaseTransService:def __init__(self, transaction_manager: TransactionManager = None) -> None:self.transaction_manager: TransactionManager = transaction_managerdef with_transaction(self, func, commit_transaction: bool = False):return self.transaction_manager.with_transaction(func, commit_transaction)def get_session(self):return self.transaction_manager.session if self.transaction_manager is not None else None
六、测试事务功能
from dmspycommon.utils.mongo.trans.transaction_class_service import TransactionClassService
from dmspycommon.utils.mongo.trans.transaction_second_class_service import TransactionSecondClassService
from dmspycommon.utils.mongo.trans.transaction_third_class_service import TransactionThirdClassService
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.models.mongo.mongo_settings import MongoSettingsclass TransactionTester:def __init__(self):passdef init_data(self):mongo_settings: MongoSettings = MongoSettings()mongo_settings.connection_string = "mongodb://localhost:27017"tran_manager = TransactionManager(mongo_settings)tran_manager_02 = TransactionManager(mongo_settings)#对象初始化时使用相同的TransactionManager对象,其内部的DB操作会处于同一个事务中tran_svc = TransactionClassService(tran_manager)tran_second_svc = TransactionSecondClassService(tran_manager)tran_third_svc = TransactionThirdClassService(tran_manager)data = {'name':'Alice', 'age':62}insert_main_data = tran_svc.with_transaction(tran_svc.insert_main_data)insert_secondmain_data = tran_second_svc.transaction_manager.with_transaction(tran_second_svc.insert_secondmain_data)insert_thirdmain_data = tran_third_svc.transaction_manager.with_transaction(tran_third_svc.insert_thirdmain_data, True)insert_main_data(data=data)insert_secondmain_data(data=data)insert_thirdmain_data(data=data)if __name__ == '__main__':tran_tester = TransactionTester()insert_main_data = tran_tester.init_data()print("tran_tester.init_data() 执行完毕")
七、总结
到此为止pymongo数据库事务的封装及使用方法就说完了,主要是依靠Python装饰器来实现,再加上对事务操作的封装和对具体业务Service进行事务化改造,这样从上到下就都能支持事务操作了,如果本文对您有帮助的话,请点赞、收藏、评论支持下,如果有更好的思路,也欢迎在评论区留言讨论,谢谢大家。
为了方便大家分享工作心得、交流技术问题,我创建了QQ群389591879,
大家也可以在里边相互了解各自公司的信息,希望能对大家有所帮助,同道中人欢迎加群。