当前位置: 首页 > ds >正文

多线程爬虫中实现线程安全的MySQL连接池

多线程爬虫中实现线程安全的MySQL连接池

在日常开发中,数据库操作频繁建立/关闭连接会带来性能损耗,尤其在多线程场景中更容易出现连接复用、阻塞等问题。因此,本文介绍如何使用 Python 封装一个 线程安全的 MySQL 连接池,并通过 threading 模拟多线程高并发操作数据库。


一、项目背景

  • 目标: 封装一个通用的、可复用的、线程安全的 MySQL 连接池类
  • 实现: 使用 DBUtilsPooledDB 实现底层连接池逻辑,支持最大连接数、最小缓存连接数等参数
  • 特点:
    • 多线程安全
    • 自动释放连接
    • 封装常用的 CRUD 操作
    • 内置异常处理与日志输出

二、依赖准备

pip install pymysql DBUtils

三、连接池封装代码(ConnectionPool)

# CoreUtils/Sql.py
import pymysql
import logging
import traceback
from DBUtils.PooledDB import PooledDBclass ConnectionPool:"""多线程同步连接池"""def __init__(self, host, database, user=None, password=None,port=3306, charset="utf8mb4", max_connections=10,min_cached=2, max_cached=5, blocking=True):"""初始化连接池"""self._pool = PooledDB(creator=pymysql,maxconnections=max_connections,mincached=min_cached,maxcached=max_cached,blocking=blocking,host=host,port=port,user=user,password=password,database=database,charset=charset,use_unicode=True,cursorclass=pymysql.cursors.DictCursor,autocommit=True)def _get_conn(self):return self._pool.connection()def query(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchall()finally:cursor.close()conn.close()def get(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchone()finally:cursor.close()conn.close()def execute(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:return cursor.execute(sql, params)except Exception as e:traceback.print_exc()logging.error(f"SQL执行错误: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def insert(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.lastrowidexcept Exception as e:logging.error(f"插入出错: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def table_has(self, table_name, field, value):sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return self.get(sql, value)def table_insert(self, table_name, item: dict):fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return self.execute(sql, values)except pymysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重复插入被跳过")else:logging.error("插入数据出错: %s\n数据: %s", e, item)raisedef table_update(self, table_name, updates: dict, field_where: str, value_where):set_clause = ', '.join([f"{k}=%s" for k in updates])values = list(updates.values()) + [value_where]sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"self.execute(sql, values)

四、多线程测试代码

以下代码通过 10 个线程并发对数据库进行增删改查,测试连接池的稳定性与日志输出的整洁性。

import threading
import logging
from CoreUtils.Sql import ConnectionPool# 日志格式配置
logging.basicConfig(level=logging.INFO,format='[%(asctime)s][%(levelname)s][%(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)# 初始化连接池
pool = ConnectionPool(host='localhost',database='test_db',user='root',password='your_password',port=3306
)def test_multithread():def thread_task(i):name = f"User-{i}"age = 20 + itry:pool.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", (name, age))logging.info(f"插入成功: {name}")data = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"查询结果: {data}")pool.execute("UPDATE test_table SET age = age + 1 WHERE name = %s", (name,))logging.info("年龄更新完成")updated = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"更新后数据: {updated}")pool.execute("DELETE FROM test_table WHERE name=%s", (name,))logging.info("删除完成")pool.table_insert("test_table", {"name": f"{name}_new", "age": age})logging.info("table_insert 成功")exists = pool.table_has("test_table", "name", f"{name}_new")logging.info(f"table_has 检查: {exists is not None}")pool.table_update("test_table", {"age": 99}, "name", f"{name}_new")logging.info("table_update 完成")except Exception as e:logging.exception("线程异常")threads = []for i in range(10):t = threading.Thread(target=thread_task, args=(i,), name=f"Thread-{i}")threads.append(t)t.start()for t in threads:t.join()logging.info("多线程测试完成")if __name__ == "__main__":test_multithread()

五、准备测试数据表

CREATE TABLE test_table (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(100) UNIQUE NOT NULL,age INT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

六、运行结果示意

运行后,你将看到类似如下整齐的日志输出:

在这里插入图片描述

你可以根据实际项目需要将日志输出到文件中(通过 filename='xxx.log' 配置 logging.basicConfig())。


七、总结

本文介绍了一个线程安全的 MySQL 连接池封装方式,并通过多线程场景验证其并发稳定性。在高并发读写、日志整洁输出、连接复用等方面表现良好,适用于中小型 Python 爬虫项目中的数据库访问层封装。

http://www.xdnf.cn/news/3417.html

相关文章:

  • Java程序员如何设计一个高并发系统?
  • 基于MCP协议实现一个智能审核流程
  • 虚拟内存笔记(一)
  • AVPro Video加载视频文件并播放,可指定视频文件的位置、路径等参数
  • 运用ESS(弹性伸缩)技术实现服务能力的纵向扩展
  • foxmail时不时发送不了邮件问题定位解决过程
  • 苍穹外卖11
  • Windows查看和修改IP,IP互相ping通
  • 使用模块中的`XPath`语法提取非结构化数据
  • Learning vtkjs之ImageMarchingCubes
  • 100 个 NumPy 练习
  • centos安装nginx
  • 新手小白如何查找科研论文?
  • 2025深圳杯东三省数学建模竞赛选题建议+初步分析
  • 26个脑影像工具包合集分享:从预处理到SCI成图
  • 为什么定位关闭了还显示IP属地?
  • 软考中级-软件设计师 数据库(手写笔记)
  • TS类型体操练习
  • Rancher 2.6.3企业级容器管理平台部署实践
  • ESP32-C3 Secure Boot 使用多个签名 Key
  • FEKO许可管理
  • YOLO11改进-模块-引入跨模态注意力机制CMA 提高多尺度 遮挡
  • 6轴、智能、低功耗惯性测量单元BMI270及其OIS接口
  • 开源 RAG 框架对比:LangChain、Haystack、DSPy 技术选型指南
  • 常用矩阵求导
  • Java父类、子类实例初始化顺序详解
  • 92.一个简单的输入与显示示例 Maui例子 C#例子
  • 论文速读 - 通过提示工程创建全面的合成数据集以支持医疗领域模型训练
  • 【Scrapy】简单项目实战--爬取dangdang图书信息
  • 柯希霍夫积分法偏移成像中数据分布不均匀的处理方法