技能补全之Python操作MongoDB
Python操作MongoDB
- 〇、前言
- 一、MongoDB 操作符
- 1、数据存储形式
- 2、操作符
- 二、Python 操作 MongoDB
- 1、连接数据库
- 1.1、创建 client
- 1.2、获取 db
- 1.3、获取 collection
- 2、新增记录
- 3、查找记录
- 4、更新记录
- 5、删除记录
- 三、总结
〇、前言
大学时,对于数据库方面的学习,主要集中在关系型的MySQL 和非关系型的 Redis,对于 MongoDB 只是知道存在,而未有进一步的操作实践。手头上的工作,主要就是进行自动化测试脚本的维护,然而,因为业务特性,脚本里面常常涉及到 MongoDB的操作,因此,是时候将相关技能:Python操作MongoDB
给补全。
一、MongoDB 操作符
1、数据存储形式
MongoDB 是一种文档型数据库,一条记录就是一份文档,而文档内容通常类似如下:
{"_id": {"$oid": "68b39b5b01ee5ebf9be649aa"},"address": "大唐长安东四坊","birthDate": 19990914,"email": "2581676070@qq.com","modifiedTime": 1756606137412,"sex": 0,"username": "张三"}
每个文档都会有一个由MongoDB数据库引擎生成并维护的 _id
,而剩下的字段便是开发者自行存储的。每个字段,除了可以说基本的数据类型外,也可以是一个完整的对象。
数据展示上,MongoDB 所存储的文档就是 JSON 格式的数据对象, 而在实际存储和传输过程中,用的是 BSON(binary json)格式,但,总归而言就是 JSON 格式。而这也是为什么 Python 代码中可以直接以 dict
类型变量,去承接从 MongoDB 中查询出来的记录。
2、操作符
对于 JSON 格式的数据,都知道支持使用 json path
去访问对象字段,比如 $.store.book.name
,然而,采用JSON格式为数据封装形式的 MongoDB 并没有直接支持 JSON Path 操作符,但是,高效地查询又是数据操作过程中所必须的,因此,MongoDB 为自己量身定做了一套操作符。
MongoDB 操作符主要分为以下几类:
MongoDB 的查询操作符主要分为以下几类:
- 比较操作符
- $eq:匹配等于指定值的字段
- $ne:匹配不等于指定值的字段
- $gt:匹配大于指定值的字段
- $gte:匹配大于或等于指定值的字段
- $lt:匹配小于指定值的字段
- $lte:匹配小于或等于指定值的字段
- $in:匹配字段值在指定数组内的文档
- $nin:匹配字段值不在指定数组内的文档
- 逻辑操作符
- $and:组合多个条件,要求全部匹配
- $or:组合多个条件,匹配其中任意一个
- $not:匹配不满足条件的文档
- $nor:匹配所有条件均不满足的文档
- 数组操作符
- $all:匹配数组包含所有指定值的文档
- $elemMatch:匹配数组内至少一个元素满足所有指定条件
- $size:匹配数组长度为指定值的文档
- 元素操作符
- $exists:匹配包含(或不包含)指定字段的文档
- $type:匹配字段类型符合指定 BSON 类型的文档
- 评估操作符
- $regex:正则表达式匹配字段值
- $mod:对字段值取模运算后匹配
- $text:对文本索引字段执行全文搜索
- $where:通过 JavaScript 表达式自定义查询逻辑
- $jsonSchema:基于 JSON Schema 验证文档结构
- 地理空间操作符
- $geoWithin:匹配位于指定几何区域内的文档
- $geoIntersects:匹配与指定几何图形相交的文档
- $near / $nearSphere:匹配靠近指定点的文档
- 投影操作符
- $:投影匹配数组中的第一个元素
- $elemMatch:投影数组内满足条件的元素
- $meta:返回文本搜索的匹配分数或其他元数据
- 更新操作符
- $set:设置字段的值
- $unset:删除字段
- $inc:增加字段的值
- $push:向数组添加元素
- $pull:从数组移除元素
- 其他操作符
- $expr:在查询中使用聚合表达式
- $bitsAllClear / $bitsAllSet:按位运算匹配字段
二、Python 操作 MongoDB
Python 代码中操作 MongoDB,需要三方库 pymongo
的支持,所以,可以先在项目环境中,使用命令:
pip install pymongo
将依赖安装好。
另外,为了方便操作,本文所涉及的相关 MongoDB 操作,都是使用部署在本地的 MongoDB 服务,并且是不需要使用密码访问的。
1、连接数据库
首先,看一下如何使用 pymongo 连接数据库:
import unittestfrom pymongo import MongoClientclass TestLearnConnect(unittest.TestCase):def test_connect_localMongoDB(self):"""test connect to local mongodb:return: None"""# 1、创建客户端对象client = MongoClient()# 2、获取数据库对象db = client['examples']# 3、获取集合对象collection = db.get_collection("demo")# 4、查询数据result = collection.find_one({"opt": "insert"})# 5、打印结果print(f"result: {result}")# 6、断言self.assertIsNotNone(result)
由于操作 MongoDB 的基本单元是文档集合,所以,连接 MongoDB 数据库的过程中,通常会进一步地将目标文档集合连接起来。
1.1、创建 client
MongoClient 方法的原型如下:
如果缺省所有参数,则默认连接本地的MongoDB,因此,如果你的 MongoDB 服务不是部署在本地的,那么就需要显式传入参数,例如 client = MongoClient('10.8.44.25', 27017)
1.2、获取 db
获取 db 对象,有两种语法:
- 直接用
client['<db name>']
的形式获取 - 使用 client 对象的属性方法
get_database(<db name>)
获取
获取具体 db 的时候,如果原本不存在,那么会随着写入具体数据而创建并持久化;如果原本不存在,但没有写入具体数据,那么只会在内存中创建这个 db,并不会持久化。
1.3、获取 collection
获取 collection 同样有两种语法:
- 直接用
db[<collection name>]
获取 - 使用 db 对象的属性方法
get_collection(<collection name>)
获取
获取 collection 时,同样具有不存在则创建的特性,也同样需要在写入具体的文档记录后,才会进行持久化。
对于 collection 的创建,db 对象也提供了专门的属性方法 create_collection
,使用示例如下:
import datetime
import json
import unittestfrom pymongo import MongoClientclass LearnCreateCollection(unittest.TestCase):client = MongoClient('localhost', 27017)db = client.get_database("examples")def test_create_collectionByAuto(self):"""创建集合:return:"""# 方式一,使用 db[collection_name]的形式创建user_data = {"username": "彭友聪","sex": 1,"birthDate": 19990907,"address": "广东省深圳市宝安区西乡街道固戍社区","email": "2923616405@qq.com",}user = self.db['user']user.insert_one(user_data)count = user.count_documents(filter={"username": "彭友聪", "email": {"$eq": "2923616405@qq.com"}})self.assertGreater(count, 0)def test_create_collectionByManual(self):"""手动创建集合:return:"""startTimestamp = int(datetime.datetime.now().timestamp() * 1000)endTimestamp = startTimestamp + 86400000sport_record_date = {"userId": '201724073161',"sportType": 2,"sportName": "户外健走","date": 20250831,"startTimestamp": startTimestamp,"endTimestamp": endTimestamp,"metaData": json.dumps({"distance": 1000,"duration": 1000,"calorie": 1000,"heartRate": 114,"speed": 24,"steps": 10000,"elevation": 1000,"temperature": 3820,"humidity": 1000,"pressure": 1000,"light": 1000,"acceleration": 1000,})}sport_record = self.db.create_collection('sport_record')sport_record.insert_one(sport_record_date)count = sport_record.count_documents(filter={"userId": "201724073161", "sportName": "户外健走"})self.assertGreater(count, 0)
2、新增记录
持有了 collection 对象后,就可以进行增删改查的具体操作了。
首先,看一下如何进行记录的新增——毕竟新部署的数据库上,啥数据都没有。记录的新增可以是单条的 insert_one
,也可以是多条的 insert_many
,示例如下:
import unittestfrom pymongo import MongoClientclass LearnInsertDocument(unittest.TestCase):client = MongoClient('localhost', 27017)db = client.get_database("examples")def test_insert_one(self):user = self.db.get_collection("user")user_data = {"username": "吴仁荻","sex": 0,"birthDate": 19990914,"address": "","email": "2581676070@qq.com"}user.insert_one(user_data)res = user.find({"email": {"$in": ["2581676070@qq.com"]}})self.assertIsNotNone(res)self.assertEqual(res[0]["username"], "吴雅男")self.assertEqual(res[0]["sex"], 0)self.assertEqual(res[0]["birthDate"], 19990914)self.assertEqual(res[0]["address"], "")self.assertEqual(res[0]["email"], "2581676070@qq.com")def test_insert_many(self):user = self.db.get_collection("user")user_data = [{"username": "张三","sex": 0,"birthDate": 19990914,"address": "大唐长安东四坊","email": "2581676070@qq.com"},{"username": "李四","sex": 0,"birthDate": 19990914,"address": "大唐长安东四坊","email": "2581676070@qq.com"}]user.insert_many(user_data)username_pattern = r"^[\一-\龥]+$"res = user.find({"username": {"$regex": username_pattern}})self.assertIsNotNone(res)print("\n")for item in res:self.assertIsNotNone(item["username"])self.assertIsNotNone(item["sex"])self.assertIsNotNone(item["birthDate"])self.assertIsNotNone(item["address"])self.assertIsNotNone(item["email"])print(f"user info: {item}")
为了确保数据真的插入到 MongoDB 中,可以使用 find
或 find_one
查找记录。
3、查找记录
查找记录主要使用 find_one
和 find
方法,顾名思义,前者用于查找单一记录。对于 find_one
的使用,需要配合能够精确定位的筛选条件,否则可能返回非预期的数据;findd_one 在查找记录时,如果所使用的筛选记录不够精确,那么只会按照自然顺序返回第一个文档记录;所以,对于 find_one 所使用的筛选条件,不妨先用 count_documents
方法统计一下匹配记录的数量,并断言数量为1 的时候,才去执行 find_one 语句。
import unittestfrom pymongo import MongoClientclass LearnFindDocument(unittest.TestCase):client = MongoClient('localhost', 27017)db = client.get_database("examples")def test_find_one(self):collection = self.db.get_collection("user")count = collection.count_documents(filter={"sex": 1})self.assertEqual(1, count)result = collection.find_one(filter={"sex": 1})self.assertIsNotNone(result)self.assertEqual("彭友聪", result['username'])self.assertEqual("2923616405@qq.com", result['email'])def test_find_many(self):collection = self.db.get_collection("user")count = collection.count_documents(filter={"sex": {"$lt": 1}})self.assertGreater(count, 1)result = collection.find(filter={"sex": {"$lt": 1}})self.assertIsNotNone(result)for item in result:self.assertIsInstance(item, dict)self.assertIsInstance(item['username'], str)self.assertNotEqual("", item['username'])self.assertIsInstance(item['birthDate'], int)self.assertNotEqual(0, item['birthDate'])def test_find_manyByCondition(self):user = self.db.get_collection("user")# 查询两个汉字组成的用户名pattern = r"^.{2}$"# 方式1:使用 $where操作符result = user.find(filter={"username": {"$exists": True}, "$where": "this.username.length === 2"})self.assertIsNotNone(result)result_list = result.to_list()self.assertGreater(len(result_list), 0)print(f"result length: {len(result_list)}")# 方式2:使用正则表达式result = user.find(filter={"username": {"$regex": pattern}})self.assertIsNotNone(result)result_list = result.to_list()self.assertGreater(len(result_list), 0)print(f"result length: {len(result_list)}")
查询记录时,为了尽可能准确的筛选出目标记录,可以使用合适的操作符,尤其是当需要使用阈值去过滤记录的时候。
4、更新记录
更新记录,既可以是对已存在的文档字段赋予新的数据值,也可以是向已经存在的文档记录插入新的字段。
import datetime
import unittestfrom pymongo import MongoClientclass LearnUpdateDocument(unittest.TestCase):client = MongoClient('localhost', 27017)db = client.get_database("examples")collection = db.get_collection("user")def test_update_one(self):"""更新一条数据:return:"""new_address = "广东省深圳市龙岗区或南山区"result = self.collection.update_one(filter={"username": {"$eq": "吴雅男"}, "address": {"$exists": True}},update={"$set": {"address": new_address}})print(result.modified_count)result = self.collection.find_one({"username": {"$eq": "吴雅男"}})self.assertNotEqual("", result['address'])self.assertEqual(new_address, result['address'])def test_update_many(self):"""更新多条数据:return:"""cur_timestamp = int(datetime.datetime.now().timestamp() * 1000)username_pattern = r"^[\一-\龥]+$"result = self.collection.update_many(filter={"sex": {"$gte": 0}, "username": {"$regex": username_pattern}, "modifiedTime": {"$exists": False}},update={"$set": {"modifiedTime": cur_timestamp}})self.assertGreaterEqual(result.modified_count, 0)print(result.modified_count)result = self.collection.find({"$where": "this.username.length > 0"})for item in result:self.assertIn("modifiedTime", item)
增加字段的操作,对应到 MySQL 操作,就是更新表结构。修改文档的时候,不论是对已经存在的字段进行数据值更新,还是增加不存在的字段,都建议结合 $exists
操作符,尤其是插入新字段的操作,$exists
操作符可以有效避免数据结构被破坏。
5、删除记录
删除记录同样支持单一删除和批量删除,前者使用 delete_one
,后者使用 delete_many
,同样的,为了确保删除操作没有删错记录,筛选条件语句中建议使用合适的操作符。
import datetime
import unittestfrom pymongo import MongoClientclass LearnDeleteDocument(unittest.TestCase):client = MongoClient('localhost', 27017)db = client.get_database("examples")collection = db["user"]def test_delete_one(self):"""删除一个文档:return:"""user_data = {"address": "广东省深圳市宝安区西乡街道固戍社区","birthDate": 19990907,"email": "2923616405@qq.com","modifiedTime": int(datetime.datetime.now().timestamp() * 1000),"sex": 1,"username": "Tom Jason"}count = self.collection.count_documents(filter={"username": {"$eq": "Tom Jason"}})self.assertEqual(0, count)self.collection.insert_one(user_data)count = self.collection.count_documents(filter={"username": {"$eq": "Tom Jason"}})self.assertEqual(1, count)result = self.collection.delete_one(filter={"username": {"$eq": "Tom Jason"}})self.assertEqual(1, result.deleted_count)count = self.collection.count_documents(filter={"username": {"$eq": "Tom Jason"}})self.assertEqual(0, count)def test_delete_many(self):"""删除多个文档:return:"""user_data = [{"address": "广东省深圳市宝安区西乡街道固戍社区","birthDate": 19990907,"email": "2923616405@qq.com","modifiedTime": int(datetime.datetime.now().timestamp() * 1000),"sex": 1,"username": "Tom Jason"},{"address": "广东省深圳市宝安区西乡街道固戍社区","birthDate": 19990907,"email": "2923616405@qq.com","modifiedTime": int(datetime.datetime.now().timestamp() * 1000),"sex": 1,"username": "Mico Jackson"},{"address": "广东省深圳市宝安区西乡街道固戍社区","birthDate": 19990907,"email": "2923616405@qq.com","modifiedTime": int(datetime.datetime.now().timestamp() * 1000),"sex": 1,"username": "Jammy Fores"},]username_pattern = r'^[A-Z][a-z]+ [A-Z][a-z]+$'count = self.collection.count_documents(filter={"username": {"$regex": username_pattern}})self.assertEqual(0, count)self.collection.insert_many(user_data)count = self.collection.count_documents(filter={"username": {"$regex": username_pattern}})self.assertGreater(count, 0)result = self.collection.delete_many(filter={"username": {"$regex": username_pattern}})self.assertEqual(count, result.deleted_count)count = self.collection.count_documents(filter={"username": {"$regex": username_pattern}})self.assertEqual(0, count)
三、总结
经过上面的学习,我相信屏幕前的你,一定对如何使用 Python 操作 MongoDB 有了进一步的认知,此外,上面那些结合 unittest 框架的代码案例,基本上都保证了相关操作的准确性——因为不符合预期的结果将导致相关断言发生失败。