多线程爬虫中实现线程安全的MySQL连接池
多线程爬虫中实现线程安全的MySQL连接池
在日常开发中,数据库操作频繁建立/关闭连接会带来性能损耗,尤其在多线程场景中更容易出现连接复用、阻塞等问题。因此,本文介绍如何使用 Python 封装一个 线程安全的 MySQL 连接池,并通过 threading
模拟多线程高并发操作数据库。
一、项目背景
- 目标: 封装一个通用的、可复用的、线程安全的 MySQL 连接池类
- 实现: 使用
DBUtils
的PooledDB
实现底层连接池逻辑,支持最大连接数、最小缓存连接数等参数 - 特点:
- 多线程安全
- 自动释放连接
- 封装常用的 CRUD 操作
- 内置异常处理与日志输出
二、依赖准备
pip install pymysql DBUtils
三、连接池封装代码(ConnectionPool)
# CoreUtils/Sql.py
import pymysql
import logging
import traceback
from DBUtils.PooledDB import PooledDBclass ConnectionPool:"""多线程同步连接池"""def __init__(self, host, database, user=None, password=None,port=3306, charset="utf8mb4", max_connections=10,min_cached=2, max_cached=5, blocking=True):"""初始化连接池"""self._pool = PooledDB(creator=pymysql,maxconnections=max_connections,mincached=min_cached,maxcached=max_cached,blocking=blocking,host=host,port=port,user=user,password=password,database=database,charset=charset,use_unicode=True,cursorclass=pymysql.cursors.DictCursor,autocommit=True)def _get_conn(self):return self._pool.connection()def query(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchall()finally:cursor.close()conn.close()def get(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchone()finally:cursor.close()conn.close()def execute(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:return cursor.execute(sql, params)except Exception as e:traceback.print_exc()logging.error(f"SQL执行错误: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def insert(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.lastrowidexcept Exception as e:logging.error(f"插入出错: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def table_has(self, table_name, field, value):sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return self.get(sql, value)def table_insert(self, table_name, item: dict):fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return self.execute(sql, values)except pymysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重复插入被跳过")else:logging.error("插入数据出错: %s\n数据: %s", e, item)raisedef table_update(self, table_name, updates: dict, field_where: str, value_where):set_clause = ', '.join([f"{k}=%s" for k in updates])values = list(updates.values()) + [value_where]sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"self.execute(sql, values)
四、多线程测试代码
以下代码通过 10 个线程并发对数据库进行增删改查,测试连接池的稳定性与日志输出的整洁性。
import threading
import logging
from CoreUtils.Sql import ConnectionPool# 日志格式配置
logging.basicConfig(level=logging.INFO,format='[%(asctime)s][%(levelname)s][%(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)# 初始化连接池
pool = ConnectionPool(host='localhost',database='test_db',user='root',password='your_password',port=3306
)def test_multithread():def thread_task(i):name = f"User-{i}"age = 20 + itry:pool.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", (name, age))logging.info(f"插入成功: {name}")data = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"查询结果: {data}")pool.execute("UPDATE test_table SET age = age + 1 WHERE name = %s", (name,))logging.info("年龄更新完成")updated = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"更新后数据: {updated}")pool.execute("DELETE FROM test_table WHERE name=%s", (name,))logging.info("删除完成")pool.table_insert("test_table", {"name": f"{name}_new", "age": age})logging.info("table_insert 成功")exists = pool.table_has("test_table", "name", f"{name}_new")logging.info(f"table_has 检查: {exists is not None}")pool.table_update("test_table", {"age": 99}, "name", f"{name}_new")logging.info("table_update 完成")except Exception as e:logging.exception("线程异常")threads = []for i in range(10):t = threading.Thread(target=thread_task, args=(i,), name=f"Thread-{i}")threads.append(t)t.start()for t in threads:t.join()logging.info("多线程测试完成")if __name__ == "__main__":test_multithread()
五、准备测试数据表
CREATE TABLE test_table (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(100) UNIQUE NOT NULL,age INT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
六、运行结果示意
运行后,你将看到类似如下整齐的日志输出:
你可以根据实际项目需要将日志输出到文件中(通过 filename='xxx.log'
配置 logging.basicConfig()
)。
七、总结
本文介绍了一个线程安全的 MySQL 连接池封装方式,并通过多线程场景验证其并发稳定性。在高并发读写、日志整洁输出、连接复用等方面表现良好,适用于中小型 Python 爬虫项目中的数据库访问层封装。