PostgreSQL MCP 使用案例
## 概述
PostgreSQL MCP(PostgreSQL Multi-host Cluster Provisioning)是一种用于部署和管理多节点PostgreSQL集群的工具和架构。它提供了高效的数据库集群管理、高可用性保障和负载均衡功能。本文档将介绍PostgreSQL MCP的基本使用方法和常见应用场景。
## 环境准备
### 安装PostgreSQL MCP
```bash
pip install pg-mcp
```
### 基本配置
创建配置文件 `pgmcp_config.json`:
```json
{
"masters": [
{
"host": "主数据库1 IP",
"port": 5432,
"user": "postgres",
"password": "密码",
"database": "数据库名"
},
{
"host": "主数据库2 IP",
"port": 5432,
"user": "postgres",
"password": "密码",
"database": "数据库名"
}
],
"replicas": [
{
"host": "只读副本1 IP",
"port": 5432,
"user": "postgres",
"password": "密码",
"database": "数据库名"
},
{
"host": "只读副本2 IP",
"port": 5432,
"user": "postgres",
"password": "密码",
"database": "数据库名"
}
],
"connection_pool": {
"min_connections": 5,
"max_connections": 20,
"idle_timeout": 300
},
"high_availability": {
"failover_timeout": 30,
"max_retry_attempts": 3,
"enable_auto_failover": true
}
}
```
## 基本使用案例
### 案例1: 连接数据库集群
```python
from pg_mcp import ConnectionPool
# 初始化连接池
pool = ConnectionPool.from_config("pgmcp_config.json")
# 获取连接
connection = pool.get_connection()
try:
# 使用连接
with connection.cursor() as cursor:
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"PostgreSQL 版本: {version[0]}")
finally:
# 归还连接到连接池
connection.close()
```
### 案例2: 读写分离
```python
from pg_mcp import ConnectionPool
pool = ConnectionPool.from_config("pgmcp_config.json")
# 写操作 - 使用主库
def insert_data(name, age):
connection = pool.get_master_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (name, age) VALUES (%s, %s)"
cursor.execute(sql, (name, age))
connection.commit()
finally:
connection.close()
# 读操作 - 使用只读副本
def get_user(user_id):
connection = pool.get_replica_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
finally:
connection.close()
# 使用示例
insert_data("张三", 25)
user = get_user(1)
print(user)
```
### 案例3: 事务处理与MVCC优化
```python
from pg_mcp import ConnectionPool
pool = ConnectionPool.from_config("pgmcp_config.json")
def transfer_money(from_account, to_account, amount):
connection = pool.get_master_connection()
try:
# PostgreSQL默认是事务模式,不需要显式begin
with connection.cursor() as cursor:
# 检查余额 - 使用FOR UPDATE避免并发问题
cursor.execute("SELECT balance FROM accounts WHERE id = %s FOR UPDATE", (from_account,))
from_balance = cursor.fetchone()[0]
if from_balance < amount:
raise Exception("余额不足")
# 更新转出账户
cursor.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account))
# 更新转入账户
cursor.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account))
connection.commit()
return True
except Exception as e:
connection.rollback()
print(f"转账失败: {e}")
return False
finally:
connection.close()
```
### 案例4: 批量操作与COPY命令
```python
from pg_mcp import ConnectionPool
import io
import csv
pool = ConnectionPool.from_config("pgmcp_config.json")
# 使用executemany进行批量插入
def batch_insert(users):
connection = pool.get_master_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
cursor.executemany(sql, users)
connection.commit()
print(f"成功插入 {len(users)} 条记录")
finally:
connection.close()
# 使用PostgreSQL的COPY命令进行大批量数据导入(性能更佳)
def bulk_copy(users):
connection = pool.get_master_connection()
try:
# 准备CSV数据
csv_data = io.StringIO()
csv_writer = csv.writer(csv_data)
for user in users:
csv_writer.writerow(user)
csv_data.seek(0)
with connection.cursor() as cursor:
cursor.copy_from(csv_data, 'users', sep=',', columns=('name', 'age', 'email'))
connection.commit()
print(f"成功批量导入 {len(users)} 条记录")
finally:
connection.close()
# 批量插入示例
users_data = [
("李四", 30, "lisi@example.com"),
("王五", 25, "wangwu@example.com"),
("赵六", 35, "zhaoliu@example.com")
]
batch_insert(users_data)
```
### 案例5: 连接池监控与管理
```python
from pg_mcp import ConnectionPool
pool = ConnectionPool.from_config("pgmcp_config.json")
# 获取连接池状态
def get_pool_status():
status = pool.get_status()
print(f"总连接数: {status['total_connections']}")
print(f"活跃连接数: {status['active_connections']}")
print(f"空闲连接数: {status['idle_connections']}")
print(f"等待连接数: {status['waiting_connections']}")
return status
# 监控复制延迟
def check_replication_lag():
connection = pool.get_master_connection()
try:
with connection.cursor() as cursor:
cursor.execute("""
SELECT client_addr, state, sent_lsn, write_lsn,
pg_wal_lsn_diff(sent_lsn, write_lsn) AS lag_bytes
FROM pg_stat_replication
""")
return cursor.fetchall()
finally:
connection.close()
# 使用示例
get_pool_status()
lag_info = check_replication_lag()
for replica in lag_info:
print(f"复制节点: {replica[0]}, 状态: {replica[1]}, 延迟: {replica[4]} 字节")
```
## 高级用法
### 自定义负载均衡策略
```python
from pg_mcp import ConnectionPool, LoadBalancer
class CustomLoadBalancer(LoadBalancer):
def select_replica(self, replicas):
# 自定义选择副本的逻辑
# 例如: 根据副本的负载情况来选择
return min(replicas, key=lambda replica: replica.current_load)
# 使用自定义负载均衡器
pool = ConnectionPool.from_config("pgmcp_config.json", load_balancer=CustomLoadBalancer())
```
### 故障转移与自动恢复
```python
from pg_mcp import ConnectionPool, FailoverStrategy
# 配置故障转移策略
config = {
"failover": {
"check_interval": 5,
"max_retry_attempts": 3,
"retry_delay": 1,
"promote_replica": True
}
}
pool = ConnectionPool.from_config("pgmcp_config.json", failover_strategy=FailoverStrategy(**config["failover"]))
# 带有故障转移的查询执行
def execute_with_failover(sql, params=None):
retries = 0
while retries < 3:
try:
connection = pool.get_connection()
try:
with connection.cursor() as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
finally:
connection.close()
except Exception as e:
retries += 1
if retries >= 3:
raise Exception(f"查询失败,已重试3次: {e}")
print(f"查询失败,正在重试 ({retries}/3)")
```
### 使用PostgreSQL特有功能
```python
from pg_mcp import ConnectionPool
pool = ConnectionPool.from_config("pgmcp_config.json")
# 使用JSON数据类型
def store_json_data(user_id, preferences):
connection = pool.get_master_connection()
try:
with connection.cursor() as cursor:
# PostgreSQL支持直接存储JSON数据
cursor.execute(
"INSERT INTO user_preferences (user_id, preferences) VALUES (%s, %s::jsonb)",
(user_id, json.dumps(preferences))
)
connection.commit()
finally:
connection.close()
# 使用全文搜索
def search_products(query):
connection = pool.get_replica_connection()
try:
with connection.cursor() as cursor:
cursor.execute("""
SELECT id, name, description
FROM products
WHERE to_tsvector('chinese', name || ' ' || description) @@ plainto_tsquery('chinese', %s)
ORDER BY ts_rank(to_tsvector('chinese', name), plainto_tsquery('chinese', %s)) DESC
""", (query, query))
return cursor.fetchall()
finally:
connection.close()
```
## 性能优化建议
1. **合理设置连接池大小**:根据服务器性能和负载情况调整最小和最大连接数。PostgreSQL默认max_connections为100,应避免连接池总大小超过此值。
2. **使用prepared语句**:对于频繁执行的SQL,使用prepared语句可以减少解析开销。
```python
connection = pool.get_connection()
try:
with connection.cursor() as cursor:
cursor.execute("PREPARE get_user AS SELECT * FROM users WHERE id = $1")
cursor.execute("EXECUTE get_user(%s)", (user_id,))
result = cursor.fetchone()
finally:
connection.close()
```
3. **适当配置PostgreSQL参数**:
- `shared_buffers`: 通常设置为系统内存的25%
- `work_mem`: 调整排序和哈希操作的内存使用
- `maintenance_work_mem`: 提高VACUUM等维护操作性能
- `effective_cache_size`: 设置为系统可用缓存的估计值
4. **启用连接池状态监控**:定期检查连接池状态,避免连接泄漏和资源耗尽。
5. **利用PostgreSQL并行查询**:对于大表查询,启用并行查询可提高性能。
```sql
SET max_parallel_workers_per_gather = 4;
```
## 总结
PostgreSQL MCP提供了强大的数据库集群管理、高可用性和读写分离功能。通过合理配置和使用MCP,可以显著提高PostgreSQL数据库的性能、可靠性和可扩展性。特别是利用PostgreSQL的高级特性(如JSONB支持、全文搜索和MVCC并发控制),能够构建功能丰富且高效的应用系统。
在实际应用中,应根据具体业务需求和系统负载情况,调整PostgreSQL MCP的配置参数,以达到最佳的使用效果。定期的性能监控和维护也是保障系统稳定运行的关键因素。