Python 数据库编程
一、数据库连接基础
1. 标准流程
import database_module # 如mysql.connector, sqlite3等
# 1. 建立连接
connection = database_module.connect(
host="localhost",
user="username",
password="password",
database="dbname"
)
# 2. 创建游标
cursor = connection.cursor()
# 3. 执行SQL
cursor.execute("SELECT * FROM users")
# 4. 获取结果
results = cursor.fetchall()
# 5. 关闭资源
cursor.close()
connection.close()
2. 上下文管理器(自动关闭连接)
with database_module.connect(...) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO users VALUES (?, ?, ?)", (1, "Alice", 25))
connection.commit() # 提交事务
二、主流数据库连接方案
1. SQLite(内置,适合小型应用)
import sqlite3
# 连接(自动创建数据库文件)
conn = sqlite3.connect("example.db")
# 创建表
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
age INTEGER
)
""")
# 插入数据
conn.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Bob", 30))
conn.commit()
# 查询数据
for row in conn.execute("SELECT * FROM users"):
print(row)
2. MySQL(使用 mysql-connector-python)
import mysql.connector
conn = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test"
)
cursor = conn.cursor()
cursor.execute("SELECT VERSION()")
print(f"MySQL版本: {cursor.fetchone()}")
3. PostgreSQL(使用 psycopg2)
python
运行
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="user",
password="pass",
port=5432
)
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM products LIMIT 5")
print(cursor.fetchall())
4. SQL Server(使用 pyodbc)
import pyodbc
conn = pyodbc.connect(
"DRIVER={SQL Server};"
"SERVER=localhost;"
"DATABASE=mydb;"
"UID=user;"
"PWD=password"
)
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
print(cursor.fetchone())
三、高级操作技术
1. 参数化查询(防止 SQL 注入)
# SQLite/PostgreSQL风格(?占位符)
cursor.execute("SELECT * FROM users WHERE age > ?", (18,))
# MySQL风格(%s占位符)
cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Charlie",))
# SQL Server风格(:name命名参数)
cursor.execute("UPDATE users SET age=:age WHERE id=:id", {"age": 30, "id": 1})
2. 批量操作
# 批量插入
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", data)
# 批量更新(使用字典参数)
update_data = [
{"id": 1, "age": 26},
{"id": 2, "age": 31}
]
cursor.executemany("UPDATE users SET age=:age WHERE id=:id", update_data)
3. 事务处理
try:
with conn: # 自动提交/回滚
cursor.execute("BEGIN TRANSACTION")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
except Exception as e:
print(f"事务失败: {e}")
四、ORM 框架(对象关系映射)
1. SQLAlchemy(通用 ORM)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建引擎
engine = create_engine("sqlite:///example.db")
# 定义模型
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 添加数据
new_user = User(name="David", age=40)
session.add(new_user)
session.commit()
# 查询数据
users = session.query(User).filter(User.age > 30).all()
print([u.name for u in users]) # 输出: ['David']
2. Django ORM(Django 框架内置)
python
运行
# models.py
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
# 使用示例
products = Product.objects.filter(price__gt=100).order_by("-created_at")
五、数据库连接池
1. 使用 SQLAlchemy 连接池
python
运行
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://user:pass@host/db",
pool_size=5, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_timeout=30, # 连接超时时间
pool_recycle=3600 # 连接回收时间
)
# 从连接池获取连接
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
2. 自定义连接池(以 MySQL 为例)
python
运行
from mysql.connector import pooling
pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
host="localhost",
user="user",
password="pass",
database="test"
)
# 从池中获取连接
connection = pool.get_connection()
cursor = connection.cursor()
六、性能优化技巧
批量操作替代循环:
python
运行
# 低效:
for row in data:
cursor.execute("INSERT INTO table VALUES (%s)", (row,))
# 高效:
cursor.executemany("INSERT INTO table VALUES (%s)", data)
连接复用:
python
运行
# 避免频繁创建连接
# 使用单例模式或连接池管理连接
索引优化:
python
运行
# 确保查询字段有索引
cursor.execute("CREATE INDEX idx_age ON users (age)")
分页查询:
python
运行
page_size = 100
offset = 0
while True:
cursor.execute("SELECT * FROM large_table LIMIT %s OFFSET %s", (page_size, offset))
results = cursor.fetchall()
if not results:
break
# 处理结果
offset += page_size
七、错误处理最佳实践
python
运行
import database_module
try:
conn = database_module.connect(...)
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM non_existent_table") # 故意错误
except database_module.Error as e:
if e.errno == 1146: # 表不存在
print("表不存在,请检查SQL")
elif e.errno == 2006: # 连接丢失
print("数据库连接已断开")
else:
print(f"未知错误: {e}")
finally:
if conn and conn.is_connected():
conn.close()
八、安全注意事项
密码管理:
python
运行
# 不要硬编码密码
import os
password = os.environ.get("DB_PASSWORD")
输入验证:
python
运行
# 对用户输入进行验证
if not isinstance(age, int):
raise ValueError("年龄必须为整数")
最小权限原则:
sql
-- 数据库层面:为应用创建仅拥有必要权限的用户
GRANT SELECT, INSERT ON db.table TO 'app_user'@'localhost';