当前位置: 首页 > ai >正文

asyncpg - Python异步PostgreSQL客户端库

文章目录

    • 一、关于asyncpg
      • 1、项目概览
      • 2、相关链接资源
      • 3、功能特性
    • 二、安装配置
    • 三、基本用法
    • 四、asyncpg 使用指南
      • 1、类型转换
      • 2、自定义类型转换
        • 示例:自动 JSON 转换
        • 示例:复杂类型
        • 示例:PostGIS 类型的自动转换
        • 示例:将数值列解码为浮点数
        • 示例:解码 hstore 值
      • 3、事务处理
      • 4、连接池
    • 五、常见问题解答
      • asyncpg 支持 DB-API 吗?
      • 能否将 asyncpg 与 SQLAlchemy ORM 结合使用?
      • 能否对 `asyncpg.Record` 使用点号访问?这样看起来更简洁。
      • 为什么不能在事务外使用 cursor ?
      • 为什么会出现预处理语句错误?
      • 为什么使用 `expression IN $1` 时会报 `PostgresSyntaxError` 错误?


一、关于asyncpg

1、项目概览

asyncpg 是专为PostgreSQL和Python/asyncio设计的数据库接口库,通过原生实现PostgreSQL服务器二进制协议,为Python的asyncio框架提供高效简洁的数据库访问方案。

技术定位:

  • 性能:平均比psycopg3快5倍
  • 协议支持:PostgreSQL 9.5至17版本
  • 语言要求:Python 3.8+

2、相关链接资源

  • GitHub:https://github.com/MagicStack/asyncpg
  • 官方文档:https://magicstack.github.io/asyncpg/current/
  • PyPI:https://pypi.python.org/pypi/asyncpg
  • 性能测试工具:https://github.com/MagicStack/pgbench
  • 技术博客:http://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/
  • License:Apache 2.0

状态标识:

  • 测试状态:https://github.com/MagicStack/asyncpg/actions?query=workflow%3ATests+branch%3Amaster
  • PyPI版本:https://img.shields.io/pypi/v/asyncpg.svg
  • 官方性能白皮书:https://gistpreview.github.io/?0ed296e93523831ea0918d42dd1258c2

3、功能特性

1、协议级功能支持

  • 原生实现PostgreSQL协议
  • 支持预处理语句和可滚动游标

2、高级数据类型处理

  • 复合类型自动编解码
  • 数组及嵌套结构支持

3、查询控制

  • 结果集部分迭代
  • 自定义数据类型支持

二、安装配置

asyncpg 可通过 PyPI 获取。当不使用 GSSAPI/SSPI 认证时,它没有任何依赖项。使用 pip 进行安装:

$ pip install asyncpg

如果需要 GSSAPI/SSPI 认证,请使用:

$ pip install 'asyncpg[gssauth]'

更多详情,请参阅文档。


三、基本用法

import asyncio
import asyncpgasync def run():conn = await asyncpg.connect(user='user', password='password',database='database', host='127.0.0.1')values = await conn.fetch('SELECT * FROM mytable WHERE id = $1',10,)await conn.close()asyncio.run(run())

四、asyncpg 使用指南

与数据库的交互通常从调用 connect() 开始,该方法会建立一个新的数据库会话并返回一个 Connection 实例。该连接实例提供了执行查询和管理事务的方法。

import asyncio
import asyncpg
import datetimeasync def main():# Establish a connection to an existing database named "test"# as a "postgres" user.conn = await asyncpg.connect('postgresql://postgres@localhost/test')# Execute a statement to create a new table.await conn.execute('''CREATE TABLE users(id serial PRIMARY KEY,name text,dob date)''')# Insert a record into the created table.await conn.execute('''INSERT INTO users(name, dob) VALUES($1, $2)''', 'Bob', datetime.date(1984, 3, 1))# Select a row from the table.row = await conn.fetchrow('SELECT * FROM users WHERE name = $1', 'Bob')# *row* now contains# asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1))# Close the connection.await conn.close()asyncio.run(main())

注意:asyncpg 使用 PostgreSQL 原生语法 $n 作为查询参数。


1、类型转换

asyncpg 能够自动将 PostgreSQL 类型转换为对应的 Python 类型,反之亦然。所有标准数据类型都开箱即用,包括数组、复合类型、范围类型、枚举及其任意组合。对于非标准类型,可以自定义编解码器,或覆盖标准编解码器。更多信息请参阅自定义类型转换。

下表展示了 PostgreSQL 与 Python 类型的对应关系:

PostgreSQL 类型Python 类型
anyarraylist
anyenumstr
anyrangeasyncpg.Range, tuple
anymultirangelist[asyncpg.Range ], list[tuple ] 1
recordasyncpg.Record, tuple, Mapping
bit, varbitasyncpg.BitString
boolbool
boxasyncpg.Box
byteabytes
char, name, varchar, text, xmlstr
cidripaddress.IPv4Network, ipaddress.IPv6Network
inetipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Address, ipaddress.IPv6Address 2
macaddrstr
circleasyncpg.Circle
datedatetime.date
time无时区偏移的 datetime.time
time with time zone有时区偏移的 datetime.time
timestamp无时区偏移的 datetime.datetime
timestamp with time zone有时区偏移的 datetime.datetime
intervaldatetime.timedelta
float, double precisionfloat 3
smallint, integer, bigintint
numericDecimal
json, jsonbstr
lineasyncpg.Line
lsegasyncpg.LineSegment
moneystr
pathasyncpg.Path
pointasyncpg.Point
polygonasyncpg.Polygon
uuiduuid.UUID
tidtuple

其他所有类型默认按文本格式编码/解码。

[1] 自 0.25.0 版本起支持
[2] 在 0.20.0 版本之前,asyncpg 错误地将带前缀的 inet 值处理为 IPvXNetwork 而非 IPvXInterface
[3] 单精度 float 值解码为 Python float 时可能存在精度差异,这是有限精度浮点类型的固有特性。如需精确匹配十进制表示,请在查询中将表达式转换为 doublenumeric 类型。


2、自定义类型转换

asyncpg 允许通过 Connection.set_type_codec()Connection.set_builtin_type_codec() 方法为标准类型及用户自定义类型定义类型转换函数。


示例:自动 JSON 转换

以下示例展示了如何配置 asyncpg,使其使用 json 模块对 JSON 值进行编码和解码。

import asyncio
import asyncpg
import jsonasync def main():conn = await asyncpg.connect()try:await conn.set_type_codec('json',encoder=json.dumps,decoder=json.loads,schema='pg_catalog')data = {'foo': 'bar', 'spam': 1}res = await conn.fetchval('SELECT $1::json', data)finally:await conn.close()asyncio.run(main())

示例:复杂类型

以下示例展示了如何配置 asyncpg,将 Python 的 complex 值编码和解码为 PostgreSQL 中的自定义复合类型。


import asyncio
import asyncpgasync def main():conn = await asyncpg.connect()try:await conn.execute('''CREATE TYPE mycomplex AS (r float,i float);''')await conn.set_type_codec('complex',encoder=lambda x: (x.real, x.imag),decoder=lambda t: complex(t[0], t[1]),format='tuple',)res = await conn.fetchval('SELECT $1::mycomplex', (1+2j))finally:await conn.close()asyncio.run(main())

示例:PostGIS 类型的自动转换

以下示例展示了如何配置 asyncpg 来编码和解码 PostGIS 的 geometry 类型。该方案适用于任何符合 geo 接口规范 的 Python 对象,并依赖于 Shapely 库实现,但任何支持读写 WKB 格式的库均可使用。

import asyncio
import asyncpgimport shapely.geometry
import shapely.wkb
from shapely.geometry.base import BaseGeometryasync def main():conn = await asyncpg.connect()try:def encode_geometry(geometry):if not hasattr(geometry, '__geo_interface__'):raise TypeError('{g} does not conform to ''the geo interface'.format(g=geometry))shape = shapely.geometry.shape(geometry)return shapely.wkb.dumps(shape)def decode_geometry(wkb):return shapely.wkb.loads(wkb)await conn.set_type_codec('geometry',  # also works for 'geography'encoder=encode_geometry,decoder=decode_geometry,format='binary',)data = shapely.geometry.Point(-73.985661, 40.748447)res = await conn.fetchrow('''SELECT 'Empire State Building' AS name,$1::geometry            AS coordinates''',data)print(res)finally:await conn.close()asyncio.run(main())

示例:将数值列解码为浮点数

默认情况下,asyncpg 会将数值列解码为 Python Decimal 实例。以下示例展示如何配置 asyncpg 改用浮点数进行解码。

import asyncio
import asyncpgasync def main():conn = await asyncpg.connect()try:await conn.set_type_codec('numeric', encoder=str, decoder=float,schema='pg_catalog', format='text')res = await conn.fetchval("SELECT $1::numeric", 11.123)print(res, type(res))finally:await conn.close()asyncio.run(main())

示例:解码 hstore 值

hstore 是一种用于存储键/值对的扩展数据类型。asyncpg 内置了一个编解码器,可以将 hstore 值解码为 dict 对象或进行反向编码。

由于 hstore 不是内置类型,必须通过 Connection.set_builtin_type_codec() 方法在连接上注册该编解码器:

import asyncpg
import asyncioasync def run():conn = await asyncpg.connect()# Assuming the hstore extension exists in the public schema.await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")assert result == {'a': '1', 'b': '2', 'c': None}asyncio.run(run())

3、事务处理

要创建事务,应使用 Connection.transaction() 方法。

最常见的使用事务方式是通过 async with 语句:

async with connection.transaction():await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)")

注意:当不在显式事务块中时,对数据库的任何修改都会立即生效。这种行为也被称为自动提交

更多信息请参阅事务 API文档。


4、连接池

对于服务器类应用而言,这类应用需要频繁处理请求,且在请求处理期间仅需短暂使用数据库连接,此时推荐使用连接池。asyncpg 提供了高级的池化实现,无需依赖外部连接池工具(如 PgBouncer)。

要创建连接池,请使用 asyncpg.create_pool() 函数。随后可通过生成的 Pool 对象从池中借用连接。

以下示例展示了如何用 asyncpg 实现一个计算二次幂的简单 Web 服务。


import asyncio
import asyncpg
from aiohttp import webasync def handle(request):"""Handle incoming requests."""pool = request.app['pool']power = int(request.match_info.get('power', 10))# Take a connection from the pool.async with pool.acquire() as connection:# Open a transaction.async with connection.transaction():# Run the query passing the request argument.result = await connection.fetchval('select 2 ^ $1', power)return web.Response(text="2 ^ {} is {}".format(power, result))async def init_db(app):"""Initialize a connection pool."""app['pool'] = await asyncpg.create_pool(database='postgres',user='postgres')yieldawait app['pool'].close()def init_app():"""Initialize the application server."""app = web.Application()# Create a database contextapp.cleanup_ctx.append(init_db)# Configure service routesapp.router.add_route('GET', '/{power:\d+}', handle)app.router.add_route('GET', '/', handle)return appapp = init_app()
web.run_app(app)

请参阅 Connection Pools API 文档获取更多信息。


五、常见问题解答

asyncpg 支持 DB-API 吗?

不支持。DB-API 是一个同步 API,而 asyncpg 基于异步 I/O 模型构建。因此,无法实现与 DB-API 的完全兼容,我们决定将 asyncpg 的 API 设计得更贴近 PostgreSQL 的架构和术语。未来某个时间点,我们会发布一个同步的、兼容 DB-API 的 asyncpg 版本。


能否将 asyncpg 与 SQLAlchemy ORM 结合使用?

可以。SQLAlchemy 1.4 及更高版本原生支持 asyncpg 方言,具体用法请参考其官方文档。对于较旧版本的 SQLAlchemy,可通过第三方适配器(如 asyncpgsa 或 databases)实现集成。


能否对 asyncpg.Record 使用点号访问?这样看起来更简洁。

我们决定不将 asyncpg.Record 实现为具名元组,因为需要保持 Record 方法命名空间与列命名空间的隔离。不过,你可以通过 connect() 或任何返回 Record 的方法的 record_class 参数,提供一个支持点号访问的自定义 Record 类。

class MyRecord(asyncpg.Record):def __getattr__(self, name):return self[name]

为什么不能在事务外使用 cursor ?

通过调用 Connection.cursor()PreparedStatement.cursor() 创建的游标不能在事务外使用。任何此类尝试都会导致 InterfaceError。如需创建可在事务外使用的游标,请直接使用 DECLARE ... CURSOR WITH HOLD SQL 语句。


为什么会出现预处理语句错误?

如果你间歇性遇到 prepared statement "__asyncpg_stmt_xx__" does not existprepared statement “__asyncpg_stmt_xx__” already exists 错误,很可能是因为你没有直接连接 PostgreSQL 服务器,而是通过 pgbouncer 进行连接。当 pgbouncer 处于 "transaction""statement" 连接池模式时,不支持预处理语句。你有以下几种解决方案:

  • 如果使用 pgbouncer 只是为了降低新建连接的开销(而不是为了通过连接池支持大量客户端以实现更好的扩展性),可以改用 asyncpg 提供的连接池功能,这会是更合适的选择;
  • 通过向 asyncpg.connect()asyncpg.create_pool() 传递 statement_cache_size=0 参数来禁用自动使用预处理语句(同时避免使用 Connection.prepare());
  • 将 pgbouncer 的 pool_mode 切换为 session 模式。

为什么使用 expression IN $1 时会报 PostgresSyntaxError 错误?

expression IN $1 不是有效的 PostgreSQL 语法。若要对一个序列进行值检查,请使用 expression = any($1::mytype[]) 语法,其中 mytype 表示数组元素类型。


伊织 xAI 2025-06-01(日)

http://www.xdnf.cn/news/10354.html

相关文章:

  • 4、获取树莓派温度
  • Jenkins:自动化流水线的基石,开启 DevOps 新时代
  • 【Linux网络编程】数据链路层
  • 6个月Python学习计划 Day 11 - 列表推导式、内置函数进阶、模块封装实战
  • 让编程更智能高效:探索Claude Code工具的强大功能
  • Qt源码分析:QDataStream
  • 多模态大语言模型arxiv论文略读(103)
  • 功能丰富的PDF处理免费软件推荐
  • 设计模式——命令设计模式(行为型)
  • while循环判断数字位数
  • Axure组件即拖即用:垂直折叠菜单(动态展开/收回交互)
  • day16 leetcode-hot100-31(链表10)
  • DAY40 训练和测试
  • Vue 核心技术与实战智慧商城项目Day08-10
  • 麦克风和电脑内播放声音实时识别转文字软件FunASR整合包V5下载
  • 关于langchain使用API加载model的方式
  • Java并发编程实战 Day 3:volatile关键字与内存可见性
  • C++学习-入门到精通【12】文件处理
  • 工作流引擎-09-XState 是一个 JavaScript 和 TypeScript 的状态管理库,它使用状态机和状态图来建模逻辑。
  • Hilbert曲线
  • 在Flutter中定义全局对象(如$http)而不需要import
  • vue3: baidusubway using typescript
  • MYOJ_4149:(洛谷P1002)[NOIP 2002 普及组] 过河卒(坐标型DP)
  • 在Mathematica中可视化Root和Log函数
  • 实现RabbitMQ多节点集群搭建
  • 前端框架进化史
  • Git仓库大文件清理指南
  • LangChain-结合GLM+SQL+函数调用实现数据库查询(二)
  • Spring如何实现组件扫描与@Component注解原理
  • vscode 连接远程服务器