当前位置: 首页 > news >正文

pymongo配置事务环境并封装事务功能

数据库事务我们或多或少都听说过,它是解决数据一致性、保持数据完整性的常用方案。Mongodb在2018年6月发布的4.0版本中开始引入事务功能,支持多文档的ACID特性,但Mongodb的事务功能只允许在副本集群成员或mongos环境中使用。

一、解决事务异常

异常信息:Transaction numbers are only allowed on a replica set member or mongos

异常相关资料:mongodb-community、mongodb-transactions-error

本机安装Mongodb之后保持默认配置,那就是单机模式。单机模式的Mongodb不支持事务,直接使用会报如下图所示的错误,为了解决图中的问题我们需要配置Mongodb环境。

二、配置Mongodb环境

1、关闭服务

  • 连接服务:mongodb://localhost:27017/
  • 切换到admin:use admin
  • 关闭服务:db.adminCommand({ shutdown:1, comment:"Convert to cluster" })

2、修改Mongodb的配置 

3、初始化集群

  • 连接服务:mongodb://localhost:27017/
  • 初始化:rs.initiate()
  • 查看集群配置:rs.conf()

4、往集群中添加节点

  • 添加节点:rs.add()
  • 如果除本机外没有其他节点就不用添加了,我的本地集群环境就只有一个节点
  • 切换成集群模式后,再重新连接服务,就会提示当前处在哪个节点(rs0),以及是否为主节点(primary)

三、定义事务装饰器

  • 如果大家对python装饰器有疑问或感兴趣请移步:python装饰器
  • python事务和sql事务类似,都是通过一个session来控制一组数据库操作的提交、回滚
  • 重点是要处理好提交、回滚的时机
from typing import Any
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.client_session import _TxnState, ClientSession
from dmspycommon.models.mongo.mongo_settings import MongoSettingsclass TransactionManager:def __init__(self, mongo_settings: MongoSettings):self.client = MongoClient(mongo_settings.connection_string)self.session: ClientSession  = Nonedef with_transaction(self, func, commit_transaction: bool = False):def wrapper(*args, **kwargs) -> Any:# Init ClientSessionif self.session == None:self.session = self.client.start_session()try:# Start transactionif not self.session.in_transaction:self.session.start_transaction(read_concern=ReadConcern('majority'), write_concern=WriteConcern("majority"))try:# Execute raw functionresult = func(*args, **kwargs)# Commit transactionif (commit_transaction == True and self.transaction_is_running()):self.session.commit_transaction()return resultexcept PyMongoError as err:# Rollback transactionif (self.transaction_is_running()):self.session.abort_transaction()raise errexcept Exception as ex:# Rollback transactionif (self.transaction_is_running()):self.session.abort_transaction()raise exfinally:# End sessionif (self.session._transaction.state == _TxnState.COMMITTED or self.session._transaction.state == _TxnState.ABORTED):self.session.end_session()self.session = Nonereturn wrapperdef transaction_is_running(self):return self.session != None and (self.session._transaction.state == _TxnState.STARTING or self.session._transaction.state == _TxnState.IN_PROGRESS)

四、改造Service来支持事务

第一个改动点是要把TransactionManager对象(内含事务装饰器)传到具体的服务中,如果多个服务的DB操作需要处在一个事务中那初始化时就用同一个TransactionManager对象,这个在测试事务功能时会有体现;

第二个改动点是DB操作时对于支持session参数的接口,必须要把session参数值传进去,这个在下面的两个类中会有体现;

import uuid
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.client_session import ClientSession
from dmspycommon.utils.mongo.trans.base_trans_service import BaseTransService
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.utils.property_util import get_attr_value, set_attr_valueclass TransactionClassService(BaseTransService):def __init__(self, transaction_manager: TransactionManager = None):super().__init__(transaction_manager)#self.transaction_manager: TransactionManager = transaction_managerself.db_name: str = "keai-well"self.client: MongoClient = self.transaction_manager.clientself.session: ClientSession = self.transaction_manager.sessionself.db: Database = self.client[self.db_name]def insert_data(self, data, collection_name: str, tenant_id: str = None):"""插入数据的示例方法:param data: 待插入的数据:param collection_name: 集合名:param tenant_id: 租户Id:return: 插入结果"""if (data['age']<60):raise ValueError(f'Age is less than 60')collection = self.db[collection_name]result = collection.insert_one(data, session=self.get_session())return result.inserted_iddef insert_main_data(self, data ,tenant_id: str = None):if (data['age']<60):raise ValueError(f'Age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['main']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_main_data, id = {inserted_id}')self.insert_sub_data(inserted_id, tenant_id)self.insert_detail_data(inserted_id, tenant_id)return inserted_id def insert_sub_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'address':'Alice', 'subject':'Chinese', 'score':72}if (data['score']<60):raise ValueError(f'score is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['sub']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_sub_data, id = {inserted_id}')def insert_detail_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'company':'Google', 'father':'Guoguo', 'age':82}if (data['age']<60):raise ValueError(f'age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['detail']result = collection.insert_one(data, session=self.get_session())inserted_id = result.inserted_idprint(f'insert_detail_data, id = {inserted_id}')if __name__ == '__main__':tran_svc = TransactionClassService("keai-well")data = {'name':'Alice', 'age':62}#insert_result = tran_svc.insert_main_data(data=data, commit_tran=CommitTran.Yes)insert_main_data = tran_svc.transaction_manager.with_transaction(tran_svc.insert_main_data, True)insert_main_data(data=data)
import uuid
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.client_session import ClientSession
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.utils.property_util import get_attr_value, set_attr_valueclass TransactionSecondClassService:def __init__(self, transaction_manager: TransactionManager = None):self.transaction_manager: TransactionManager = transaction_managerself.db_name: str = "keai-well"self.client: MongoClient = self.transaction_manager.clientself.session: ClientSession = self.transaction_manager.sessionself.db: Database = self.client[self.db_name]def insert_secondmain_data(self, data ,tenant_id: str = None):if (data['age']<60):raise ValueError(f'Age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['secondmain']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_secondmain_data, id = {inserted_id}')self.insert_secondsub_data(inserted_id, tenant_id)self.insert_seconddetail_data(inserted_id, tenant_id)return inserted_id def insert_secondsub_data(self, main_id, tenant_id: str = None):data = {'mainId':main_id, 'address':'Alice', 'subject':'Chinese', 'score':72}if (data['score']<60):raise ValueError(f'score is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['secondsub']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_secondsub_data, id = {inserted_id}')def insert_seconddetail_data(self, main_id, tenant_id: str = None):#把'age':82改成52,程序会抛异常导致DB持久化失败,由于我们使用了事务,所以最终DB一条数据也没插入成功data = {'mainId':main_id, 'company':'Google', 'father':'Guoguo', 'age':82}if (data['age']<60):raise ValueError(f'age is less than 60')if not get_attr_value(data, '_id'):set_attr_value(data, '_id', f"{uuid.uuid1()}")collection = self.db['seconddetail']result = collection.insert_one(data, session=self.__get_session())inserted_id = result.inserted_idprint(f'insert_seconddetail_data, id = {inserted_id}')def __get_session(self):return self.transaction_manager.session if self.transaction_manager is not None else None

五、进一步封装

细心的朋友可能发现了第四步中 TransactionClassService 继承了父类 BaseTransService,我其实是把跟事务相关的共用对象和共性操作封装到了 BaseTransService 中,这样能减少重复代码。至于 TransactionSecondClassService 没继承父类,是为了留下跟 TransactionClassService 做个对比,如果大家在实际使用过程中有更好的思路欢迎在评论区留言讨论。

from dmspycommon.utils.mongo.transaction_manager import TransactionManagerclass BaseTransService:def __init__(self, transaction_manager: TransactionManager = None) -> None:self.transaction_manager: TransactionManager = transaction_managerdef with_transaction(self, func, commit_transaction: bool = False):return self.transaction_manager.with_transaction(func, commit_transaction)def get_session(self):return self.transaction_manager.session if self.transaction_manager is not None else None

六、测试事务功能

from dmspycommon.utils.mongo.trans.transaction_class_service import TransactionClassService
from dmspycommon.utils.mongo.trans.transaction_second_class_service import TransactionSecondClassService
from dmspycommon.utils.mongo.trans.transaction_third_class_service import TransactionThirdClassService
from dmspycommon.utils.mongo.transaction_manager import TransactionManager
from dmspycommon.models.mongo.mongo_settings import MongoSettingsclass TransactionTester:def __init__(self):passdef init_data(self):mongo_settings: MongoSettings = MongoSettings()mongo_settings.connection_string = "mongodb://localhost:27017"tran_manager = TransactionManager(mongo_settings)tran_manager_02 = TransactionManager(mongo_settings)#对象初始化时使用相同的TransactionManager对象,其内部的DB操作会处于同一个事务中tran_svc = TransactionClassService(tran_manager)tran_second_svc = TransactionSecondClassService(tran_manager)tran_third_svc = TransactionThirdClassService(tran_manager)data = {'name':'Alice', 'age':62}insert_main_data = tran_svc.with_transaction(tran_svc.insert_main_data)insert_secondmain_data = tran_second_svc.transaction_manager.with_transaction(tran_second_svc.insert_secondmain_data)insert_thirdmain_data = tran_third_svc.transaction_manager.with_transaction(tran_third_svc.insert_thirdmain_data, True)insert_main_data(data=data)insert_secondmain_data(data=data)insert_thirdmain_data(data=data)if __name__ == '__main__':tran_tester = TransactionTester()insert_main_data = tran_tester.init_data()print("tran_tester.init_data() 执行完毕")

七、总结

到此为止pymongo数据库事务的封装及使用方法就说完了,主要是依靠Python装饰器来实现,再加上对事务操作的封装和对具体业务Service进行事务化改造,这样从上到下就都能支持事务操作了,如果本文对您有帮助的话,请点赞、收藏、评论支持下,如果有更好的思路,也欢迎在评论区留言讨论,谢谢大家。

为了方便大家分享工作心得、交流技术问题,我创建了QQ群389591879,
大家也可以在里边相互了解各自公司的信息,希望能对大家有所帮助,同道中人欢迎加群。

http://www.xdnf.cn/news/942697.html

相关文章:

  • JDBC基础关键_001_认识
  • Spring类型转换器相关接口和实现原理
  • 【JavaScript】利用`localStorage`实现多窗口数据交互同步【附完整源码】
  • OD 算法题 B卷【删除字符串中出现次数最少的字符】
  • 如何禁用windows server系统自动更新并防止自动重启
  • 推理式奖励模型:使用自然语言反馈改进强化学习效果
  • 卫星接收天线G/T值怎么计算?附G/T计算excel表格链接
  • 打卡day48
  • 12.7Swing控件5 JProgressBar
  • Spring AI中使用ChatMemory实现会话记忆功能
  • 算法打卡第18天
  • 【CUDA 】第5章 共享内存和常量内存——5.3减少全局内存访问(2)
  • Linux 环境配置
  • 【立体匹配】:双目立体匹配SGBM:(1)运行
  • 深入解析JavaScript构造函数与原型链
  • JavaScript 自定义对象详解
  • AI医生时代来临!o1模型在医疗诊断中超越人类医生
  • 查看进程线程的方法
  • 进制符号表示
  • 【阿里巴巴 x 浙江大学】信息与交互设计 - 信息设计漫谈
  • AIGC 基础篇 Python基础 02
  • MS8312A 车规 精密、低噪、CMOS、轨到轨输入输出运算放大器,用于传感器、条形扫描器
  • arxir网址自动转向国内镜像
  • 【DTOF传感器】光子飞行时间传感技术
  • 通信之光端机
  • 苏超 - 江苏省城市足球联赛
  • Angular中Webpack与ngx-build-plus 浅学
  • 【刷题模板】链表、堆栈
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年6月8日第102弹
  • 06. C#入门系列【自定义类型】:从青铜到王者的进阶之路