当前位置: 首页 > news >正文

Python调用SQLite及pandas相关API详解

前言

SQLite是一个轻量级的嵌入式关系数据库,它不需要独立的服务器进程,将数据存储在单一的磁盘文件中。Python内置了sqlite3模块,使得我们可以非常方便地操作SQLite数据库。同时,pandas作为Python数据分析的重要工具,也提供了与SQLite数据库交互的便捷方法。本文将详细介绍如何使用Python调用SQLite数据库,以及pandas与SQLite相关的API。

在这里插入图片描述

一、Python内置sqlite3模块基础操作

1.1 连接数据库

import sqlite3# 连接到SQLite数据库
# 如果文件不存在,会自动在当前目录创建
conn = sqlite3.connect('example.db')# 创建游标对象
cursor = conn.cursor()

1.2 创建表

# 创建表
cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,age INTEGER,email TEXT UNIQUE)
''')# 提交事务
conn.commit()

1.3 插入数据

# 插入单条数据
cursor.execute("INSERT INTO users (name, age, email) VALUES (?, ?, ?)", ('张三', 25, 'zhangsan@example.com'))# 插入多条数据
users_data = [('李四', 30, 'lisi@example.com'),('王五', 28, 'wangwu@example.com'),('赵六', 35, 'zhaoliu@example.com')
]
cursor.executemany("INSERT INTO users (name, age, email) VALUES (?, ?, ?)", users_data)conn.commit()

1.4 查询数据

# 查询所有数据
cursor.execute("SELECT * FROM users")
all_users = cursor.fetchall()
print("所有用户:", all_users)# 查询单条数据
cursor.execute("SELECT * FROM users WHERE age > ?", (30,))
one_user = cursor.fetchone()
print("第一个年龄大于30的用户:", one_user)# 查询多条数据
cursor.execute("SELECT * FROM users WHERE age > ?", (25,))
some_users = cursor.fetchmany(2)
print("前两个年龄大于25的用户:", some_users)

1.5 更新数据

# 更新数据
cursor.execute("UPDATE users SET age = ? WHERE name = ?", (26, '张三'))
conn.commit()print(f"更新了 {cursor.rowcount} 条记录")

1.6 删除数据

# 删除数据
cursor.execute("DELETE FROM users WHERE name = ?", ('赵六',))
conn.commit()print(f"删除了 {cursor.rowcount} 条记录")

1.7 关闭连接

# 关闭游标和连接
cursor.close()
conn.close()

二、使用上下文管理器优化数据库操作

使用上下文管理器可以确保连接被正确关闭,即使在出现异常的情况下:

import sqlite3def get_db_connection(db_name):"""获取数据库连接"""return sqlite3.connect(db_name)# 使用上下文管理器
with get_db_connection('example.db') as conn:cursor = conn.cursor()cursor.execute("SELECT * FROM users")results = cursor.fetchall()print(results)# 连接会在with块结束时自动关闭

三、pandas与SQLite的集成

3.1 读取SQLite数据到DataFrame

import pandas as pd
import sqlite3# 方法1:使用read_sql_query
conn = sqlite3.connect('example.db')
df = pd.read_sql_query("SELECT * FROM users", conn)
print(df)
conn.close()# 方法2:使用read_sql
with sqlite3.connect('example.db') as conn:df = pd.read_sql("SELECT * FROM users WHERE age > 25", conn)print(df)# 方法3:使用read_sql_table(需要表名)
with sqlite3.connect('example.db') as conn:df = pd.read_sql_table('users', conn)print(df)

3.2 将DataFrame写入SQLite

import pandas as pd
import sqlite3# 创建示例DataFrame
data = {'product': ['产品A', '产品B', '产品C', '产品D'],'price': [100, 200, 150, 300],'quantity': [50, 30, 45, 20]
}
df = pd.DataFrame(data)# 将DataFrame写入SQLite
with sqlite3.connect('example.db') as conn:# if_exists='replace' 替换表,'append' 追加数据,'fail' 如果表存在则失败df.to_sql('products', conn, if_exists='replace', index=False)# 验证数据已写入result = pd.read_sql("SELECT * FROM products", conn)print(result)

3.3 使用pandas进行复杂查询

import pandas as pd
import sqlite3with sqlite3.connect('example.db') as conn:# 联合查询query = """SELECT u.name, u.age, COUNT(p.id) as product_countFROM users uLEFT JOIN purchases p ON u.id = p.user_idGROUP BY u.id"""df = pd.read_sql_query(query, conn)# 使用参数化查询params = {'min_age': 25, 'max_age': 35}query_with_params = """SELECT * FROM users WHERE age BETWEEN :min_age AND :max_age"""df_filtered = pd.read_sql_query(query_with_params, conn, params=params)

四、高级用法和最佳实践

4.1 创建自定义函数

import sqlite3
import mathdef create_custom_functions(conn):"""创建自定义SQL函数"""# 创建平方根函数conn.create_function("sqrt", 1, math.sqrt)# 创建自定义聚合函数class StandardDeviation:def __init__(self):self.values = []def step(self, value):if value is not None:self.values.append(value)def finalize(self):if not self.values:return Nonemean = sum(self.values) / len(self.values)variance = sum((x - mean) ** 2 for x in self.values) / len(self.values)return math.sqrt(variance)conn.create_aggregate("stdev", 1, StandardDeviation)# 使用自定义函数
with sqlite3.connect('example.db') as conn:create_custom_functions(conn)# 使用自定义函数cursor = conn.cursor()cursor.execute("SELECT sqrt(16)")print("sqrt(16) =", cursor.fetchone()[0])cursor.execute("SELECT stdev(age) FROM users")print("年龄标准差 =", cursor.fetchone()[0])

4.2 事务处理

import sqlite3def transfer_money(conn, from_account, to_account, amount):"""转账操作示例"""try:cursor = conn.cursor()# 开始事务conn.execute("BEGIN TRANSACTION")# 从源账户扣款cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_account))# 向目标账户加款cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_account))# 提交事务conn.commit()print("转账成功")except Exception as e:# 回滚事务conn.rollback()print(f"转账失败: {e}")

4.3 数据库连接池

import sqlite3
from queue import Queue
import threadingclass SQLiteConnectionPool:def __init__(self, database, max_connections=5):self.database = databaseself.max_connections = max_connectionsself.connections = Queue(maxsize=max_connections)self.lock = threading.Lock()# 初始化连接池for _ in range(max_connections):conn = sqlite3.connect(database, check_same_thread=False)self.connections.put(conn)def get_connection(self):"""获取连接"""return self.connections.get()def return_connection(self, conn):"""归还连接"""self.connections.put(conn)def close_all(self):"""关闭所有连接"""while not self.connections.empty():conn = self.connections.get()conn.close()# 使用连接池
pool = SQLiteConnectionPool('example.db', max_connections=3)# 获取连接
conn = pool.get_connection()
try:cursor = conn.cursor()cursor.execute("SELECT * FROM users")results = cursor.fetchall()print(results)
finally:# 归还连接pool.return_connection(conn)# 关闭连接池
pool.close_all()

4.4 性能优化技巧

import sqlite3
import timedef optimize_sqlite_performance():conn = sqlite3.connect('example.db')cursor = conn.cursor()# 1. 使用批量插入而不是单条插入start_time = time.time()data = [(f'User{i}', 20+i, f'user{i}@example.com') for i in range(1000)]cursor.executemany("INSERT INTO users (name, age, email) VALUES (?, ?, ?)", data)conn.commit()print(f"批量插入1000条数据用时: {time.time() - start_time:.2f}秒")# 2. 创建索引加速查询cursor.execute("CREATE INDEX idx_age ON users(age)")cursor.execute("CREATE INDEX idx_email ON users(email)")# 3. 使用PRAGMA优化cursor.execute("PRAGMA journal_mode=WAL")  # 写前日志模式cursor.execute("PRAGMA synchronous=NORMAL")  # 同步模式cursor.execute("PRAGMA cache_size=10000")  # 缓存大小# 4. 使用预编译语句stmt = cursor.execute("SELECT * FROM users WHERE age > ?", (25,))conn.close()

五、错误处理和调试

import sqlite3
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def safe_database_operation(db_name, query, params=None):"""安全的数据库操作"""conn = Nonetry:conn = sqlite3.connect(db_name)conn.row_factory = sqlite3.Row  # 使结果可以通过列名访问cursor = conn.cursor()if params:cursor.execute(query, params)else:cursor.execute(query)if query.strip().upper().startswith('SELECT'):return cursor.fetchall()else:conn.commit()return cursor.rowcountexcept sqlite3.IntegrityError as e:logger.error(f"数据完整性错误: {e}")raiseexcept sqlite3.OperationalError as e:logger.error(f"操作错误: {e}")raiseexcept Exception as e:logger.error(f"未知错误: {e}")raisefinally:if conn:conn.close()# 使用示例
try:result = safe_database_operation('example.db', "SELECT * FROM users WHERE age > ?", (25,))for row in result:print(f"姓名: {row['name']}, 年龄: {row['age']}")
except Exception as e:print(f"操作失败: {e}")

六、pandas高级集成技巧

6.1 使用chunksize处理大数据

import pandas as pd
import sqlite3def process_large_data(db_name, table_name, chunksize=1000):"""分块处理大数据"""conn = sqlite3.connect(db_name)# 分块读取数据chunks = pd.read_sql_query(f"SELECT * FROM {table_name}", conn, chunksize=chunksize)total_rows = 0for chunk in chunks:# 处理每个数据块processed_chunk = chunk[chunk['age'] > 25]total_rows += len(processed_chunk)# 可以将处理后的数据写回数据库processed_chunk.to_sql('processed_users', conn, if_exists='append', index=False)conn.close()return total_rows

6.2 数据类型映射

import pandas as pd
import sqlite3
import numpy as np# SQLite到pandas数据类型映射
dtype_mapping = {'INTEGER': 'int64','REAL': 'float64','TEXT': 'object','BLOB': 'object','NULL': 'object'
}def read_with_dtypes(db_name, query):"""读取数据并正确设置数据类型"""conn = sqlite3.connect(db_name)# 先获取列信息cursor = conn.cursor()cursor.execute(query)column_names = [description[0] for description in cursor.description]# 读取数据df = pd.read_sql_query(query, conn)# 自动类型转换for col in df.columns:if df[col].dtype == 'object':try:df[col] = pd.to_numeric(df[col])except ValueError:try:df[col] = pd.to_datetime(df[col])except ValueError:passconn.close()return df

6.3 复杂数据操作示例

import pandas as pd
import sqlite3
from datetime import datetime, timedeltadef complex_data_operations():conn = sqlite3.connect('sales.db')# 创建示例数据dates = pd.date_range(start='2024-01-01', end='2024-12-31', freq='D')sales_data = {'date': dates,'product_id': np.random.randint(1, 6, size=len(dates)),'quantity': np.random.randint(10, 100, size=len(dates)),'price': np.random.uniform(10, 1000, size=len(dates))}df_sales = pd.DataFrame(sales_data)df_sales['total'] = df_sales['quantity'] * df_sales['price']# 写入数据库df_sales.to_sql('sales', conn, if_exists='replace', index=False)# 复杂查询:每月销售汇总query = """SELECT strftime('%Y-%m', date) as month,product_id,SUM(quantity) as total_quantity,SUM(total) as total_revenue,AVG(price) as avg_priceFROM salesGROUP BY strftime('%Y-%m', date), product_idORDER BY month, product_id"""monthly_summary = pd.read_sql_query(query, conn)# 使用pandas进行进一步分析# 计算环比增长monthly_summary['revenue_growth'] = monthly_summary.groupby('product_id')['total_revenue'].pct_change()# 写回数据库monthly_summary.to_sql('monthly_summary', conn, if_exists='replace', index=False)conn.close()return monthly_summary

七、Navicat可视化工具链接SQLite

定位到对应的.db文件,不需要输入账号密码即可查看数据库

在这里插入图片描述

总结

本文详细介绍了Python调用SQLite数据库的方法,包括使用内置的sqlite3模块进行基本的CRUD操作,以及使用pandas进行更高级的数据操作。主要涵盖以下内容:

  1. 基础操作:连接数据库、创建表、插入数据、查询数据、更新和删除数据
  2. 上下文管理器:使用with语句自动管理数据库连接
  3. pandas集成:读取SQLite数据到DataFrame、将DataFrame写入SQLite
  4. 高级用法:自定义函数、事务处理、连接池、性能优化
  5. 错误处理:异常捕获和日志记录
  6. 大数据处理:使用chunksize分块处理大量数据

通过合理使用这些API和技巧,我们可以高效地在Python中操作SQLite数据库,实现数据的存储、查询和分析。SQLite作为轻量级数据库,非常适合中小型应用、原型开发和数据分析任务。结合pandas的强大数据处理能力,可以构建功能完善的数据处理解决方案。

参考资源

  • Python sqlite3官方文档
  • pandas SQL查询文档
  • SQLite官方文档
http://www.xdnf.cn/news/440965.html

相关文章:

  • 解密企业级大模型智能体Agentic AI 关键技术:MCP、A2A、Reasoning LLMs-强化学习算法
  • 机器学习第十一讲:标准化 → 把厘米和公斤单位统一成标准值
  • 对抗系统熵增:从被动救火到主动防御的稳定性实战
  • R利用spaa包计算植物/微生物的生态位宽度和重叠指数
  • 序列化和反序列化hadoop实现
  • Math工具类全面指南
  • OpenCV CUDA 模块中用于在 GPU 上计算矩阵中每个元素的绝对值或复数的模函数abs()
  • 量子算法:开启计算新时代的技术密码
  • MATLAB实现振幅调制(AM调制信号)
  • Hadoop-HDFS-Packet含义及作用
  • 通用软件项目技术报告 - 术语词典
  • 【数据分析】从TCGA下载所有癌症的多组学数据
  • 掌握Docker Commit:轻松创建自定义镜像
  • 【MySQL】自适应哈希详解:作用、配置以及如何查看
  • Windows10安装WSA
  • ECharts中Map(地图)样式配置、渐变色生成
  • OracleLinux7.9-ssh问题
  • Windows避坑部署CosyVoice多语言大语言模型
  • CSS Grid布局:从入门到实战
  • 《Python星球日记》 第70天:Seq2Seq 与Transformer Decoder
  • sql练习题
  • springboot + mysql8降低版本到 mysql5.7
  • Java中的异常机制
  • Java 直接内存ByteBuffer.allocateDirect原理与源码解析
  • git切换分支后需要pull吗
  • Spark缓存---cache方法
  • 在Ubuntu24.04中配置开源直线特征提取软件DeepLSD
  • Java 与 Go 语言对比
  • Milvus 视角看主流嵌入式模型(Embeddings)
  • 推荐一个Winform开源的UI工具包