asyncpg - Python异步PostgreSQL客户端库
文章目录
- 一、关于asyncpg
- 1、项目概览
- 2、相关链接资源
- 3、功能特性
- 二、安装配置
- 三、基本用法
- 四、asyncpg 使用指南
- 1、类型转换
- 2、自定义类型转换
- 示例:自动 JSON 转换
- 示例:复杂类型
- 示例:PostGIS 类型的自动转换
- 示例:将数值列解码为浮点数
- 示例:解码 hstore 值
- 3、事务处理
- 4、连接池
- 五、常见问题解答
- asyncpg 支持 DB-API 吗?
- 能否将 asyncpg 与 SQLAlchemy ORM 结合使用?
- 能否对 `asyncpg.Record` 使用点号访问?这样看起来更简洁。
- 为什么不能在事务外使用 cursor ?
- 为什么会出现预处理语句错误?
- 为什么使用 `expression IN $1` 时会报 `PostgresSyntaxError` 错误?
一、关于asyncpg
1、项目概览
asyncpg 是专为PostgreSQL和Python/asyncio设计的数据库接口库,通过原生实现PostgreSQL服务器二进制协议,为Python的asyncio
框架提供高效简洁的数据库访问方案。
技术定位:
- 性能:平均比psycopg3快5倍
- 协议支持:PostgreSQL 9.5至17版本
- 语言要求:Python 3.8+
2、相关链接资源
- GitHub:https://github.com/MagicStack/asyncpg
- 官方文档:https://magicstack.github.io/asyncpg/current/
- PyPI:https://pypi.python.org/pypi/asyncpg
- 性能测试工具:https://github.com/MagicStack/pgbench
- 技术博客:http://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/
- License:Apache 2.0
状态标识:
- 测试状态:https://github.com/MagicStack/asyncpg/actions?query=workflow%3ATests+branch%3Amaster
- PyPI版本:https://img.shields.io/pypi/v/asyncpg.svg
- 官方性能白皮书:https://gistpreview.github.io/?0ed296e93523831ea0918d42dd1258c2
3、功能特性
1、协议级功能支持
- 原生实现PostgreSQL协议
- 支持预处理语句和可滚动游标
2、高级数据类型处理
- 复合类型自动编解码
- 数组及嵌套结构支持
3、查询控制
- 结果集部分迭代
- 自定义数据类型支持
二、安装配置
asyncpg 可通过 PyPI 获取。当不使用 GSSAPI/SSPI 认证时,它没有任何依赖项。使用 pip 进行安装:
$ pip install asyncpg
如果需要 GSSAPI/SSPI 认证,请使用:
$ pip install 'asyncpg[gssauth]'
更多详情,请参阅文档。
三、基本用法
import asyncio
import asyncpgasync def run():conn = await asyncpg.connect(user='user', password='password',database='database', host='127.0.0.1')values = await conn.fetch('SELECT * FROM mytable WHERE id = $1',10,)await conn.close()asyncio.run(run())
四、asyncpg 使用指南
与数据库的交互通常从调用 connect()
开始,该方法会建立一个新的数据库会话并返回一个 Connection
实例。该连接实例提供了执行查询和管理事务的方法。
import asyncio
import asyncpg
import datetimeasync def main():# Establish a connection to an existing database named "test"# as a "postgres" user.conn = await asyncpg.connect('postgresql://postgres@localhost/test')# Execute a statement to create a new table.await conn.execute('''CREATE TABLE users(id serial PRIMARY KEY,name text,dob date)''')# Insert a record into the created table.await conn.execute('''INSERT INTO users(name, dob) VALUES($1, $2)''', 'Bob', datetime.date(1984, 3, 1))# Select a row from the table.row = await conn.fetchrow('SELECT * FROM users WHERE name = $1', 'Bob')# *row* now contains# asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1))# Close the connection.await conn.close()asyncio.run(main())
注意:asyncpg 使用 PostgreSQL 原生语法 $n
作为查询参数。
1、类型转换
asyncpg 能够自动将 PostgreSQL 类型转换为对应的 Python 类型,反之亦然。所有标准数据类型都开箱即用,包括数组、复合类型、范围类型、枚举及其任意组合。对于非标准类型,可以自定义编解码器,或覆盖标准编解码器。更多信息请参阅自定义类型转换。
下表展示了 PostgreSQL 与 Python 类型的对应关系:
PostgreSQL 类型 | Python 类型 |
---|---|
anyarray | list |
anyenum | str |
anyrange | asyncpg.Range , tuple |
anymultirange | list[ asyncpg.Range ] , list[ tuple ] 1 |
record | asyncpg.Record , tuple , Mapping |
bit , varbit | asyncpg.BitString |
bool | bool |
box | asyncpg.Box |
bytea | bytes |
char , name , varchar , text , xml | str |
cidr | ipaddress.IPv4Network , ipaddress.IPv6Network |
inet | ipaddress.IPv4Interface , ipaddress.IPv6Interface , ipaddress.IPv4Address , ipaddress.IPv6Address 2 |
macaddr | str |
circle | asyncpg.Circle |
date | datetime.date |
time | 无时区偏移的 datetime.time |
time with time zone | 有时区偏移的 datetime.time |
timestamp | 无时区偏移的 datetime.datetime |
timestamp with time zone | 有时区偏移的 datetime.datetime |
interval | datetime.timedelta |
float , double precision | float 3 |
smallint , integer , bigint | int |
numeric | Decimal |
json , jsonb | str |
line | asyncpg.Line |
lseg | asyncpg.LineSegment |
money | str |
path | asyncpg.Path |
point | asyncpg.Point |
polygon | asyncpg.Polygon |
uuid | uuid.UUID |
tid | tuple |
其他所有类型默认按文本格式编码/解码。
[1] 自 0.25.0 版本起支持
[2] 在 0.20.0 版本之前,asyncpg 错误地将带前缀的 inet
值处理为 IPvXNetwork
而非 IPvXInterface
[3] 单精度 float
值解码为 Python float 时可能存在精度差异,这是有限精度浮点类型的固有特性。如需精确匹配十进制表示,请在查询中将表达式转换为 double
或 numeric
类型。
2、自定义类型转换
asyncpg 允许通过 Connection.set_type_codec()
和 Connection.set_builtin_type_codec()
方法为标准类型及用户自定义类型定义类型转换函数。
示例:自动 JSON 转换
以下示例展示了如何配置 asyncpg,使其使用 json
模块对 JSON 值进行编码和解码。
import asyncio
import asyncpg
import jsonasync def main():conn = await asyncpg.connect()try:await conn.set_type_codec('json',encoder=json.dumps,decoder=json.loads,schema='pg_catalog')data = {'foo': 'bar', 'spam': 1}res = await conn.fetchval('SELECT $1::json', data)finally:await conn.close()asyncio.run(main())
示例:复杂类型
以下示例展示了如何配置 asyncpg,将 Python 的 complex
值编码和解码为 PostgreSQL 中的自定义复合类型。
import asyncio
import asyncpgasync def main():conn = await asyncpg.connect()try:await conn.execute('''CREATE TYPE mycomplex AS (r float,i float);''')await conn.set_type_codec('complex',encoder=lambda x: (x.real, x.imag),decoder=lambda t: complex(t[0], t[1]),format='tuple',)res = await conn.fetchval('SELECT $1::mycomplex', (1+2j))finally:await conn.close()asyncio.run(main())
示例:PostGIS 类型的自动转换
以下示例展示了如何配置 asyncpg 来编码和解码 PostGIS 的 geometry
类型。该方案适用于任何符合 geo 接口规范 的 Python 对象,并依赖于 Shapely 库实现,但任何支持读写 WKB 格式的库均可使用。
import asyncio
import asyncpgimport shapely.geometry
import shapely.wkb
from shapely.geometry.base import BaseGeometryasync def main():conn = await asyncpg.connect()try:def encode_geometry(geometry):if not hasattr(geometry, '__geo_interface__'):raise TypeError('{g} does not conform to ''the geo interface'.format(g=geometry))shape = shapely.geometry.shape(geometry)return shapely.wkb.dumps(shape)def decode_geometry(wkb):return shapely.wkb.loads(wkb)await conn.set_type_codec('geometry', # also works for 'geography'encoder=encode_geometry,decoder=decode_geometry,format='binary',)data = shapely.geometry.Point(-73.985661, 40.748447)res = await conn.fetchrow('''SELECT 'Empire State Building' AS name,$1::geometry AS coordinates''',data)print(res)finally:await conn.close()asyncio.run(main())
示例:将数值列解码为浮点数
默认情况下,asyncpg 会将数值列解码为 Python Decimal
实例。以下示例展示如何配置 asyncpg 改用浮点数进行解码。
import asyncio
import asyncpgasync def main():conn = await asyncpg.connect()try:await conn.set_type_codec('numeric', encoder=str, decoder=float,schema='pg_catalog', format='text')res = await conn.fetchval("SELECT $1::numeric", 11.123)print(res, type(res))finally:await conn.close()asyncio.run(main())
示例:解码 hstore 值
hstore 是一种用于存储键/值对的扩展数据类型。asyncpg 内置了一个编解码器,可以将 hstore 值解码为 dict
对象或进行反向编码。
由于 hstore
不是内置类型,必须通过 Connection.set_builtin_type_codec()
方法在连接上注册该编解码器:
import asyncpg
import asyncioasync def run():conn = await asyncpg.connect()# Assuming the hstore extension exists in the public schema.await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")assert result == {'a': '1', 'b': '2', 'c': None}asyncio.run(run())
3、事务处理
要创建事务,应使用 Connection.transaction()
方法。
最常见的使用事务方式是通过 async with
语句:
async with connection.transaction():await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)")
注意:当不在显式事务块中时,对数据库的任何修改都会立即生效。这种行为也被称为自动提交。
更多信息请参阅事务 API文档。
4、连接池
对于服务器类应用而言,这类应用需要频繁处理请求,且在请求处理期间仅需短暂使用数据库连接,此时推荐使用连接池。asyncpg 提供了高级的池化实现,无需依赖外部连接池工具(如 PgBouncer)。
要创建连接池,请使用 asyncpg.create_pool()
函数。随后可通过生成的 Pool
对象从池中借用连接。
以下示例展示了如何用 asyncpg 实现一个计算二次幂的简单 Web 服务。
import asyncio
import asyncpg
from aiohttp import webasync def handle(request):"""Handle incoming requests."""pool = request.app['pool']power = int(request.match_info.get('power', 10))# Take a connection from the pool.async with pool.acquire() as connection:# Open a transaction.async with connection.transaction():# Run the query passing the request argument.result = await connection.fetchval('select 2 ^ $1', power)return web.Response(text="2 ^ {} is {}".format(power, result))async def init_db(app):"""Initialize a connection pool."""app['pool'] = await asyncpg.create_pool(database='postgres',user='postgres')yieldawait app['pool'].close()def init_app():"""Initialize the application server."""app = web.Application()# Create a database contextapp.cleanup_ctx.append(init_db)# Configure service routesapp.router.add_route('GET', '/{power:\d+}', handle)app.router.add_route('GET', '/', handle)return appapp = init_app()
web.run_app(app)
请参阅 Connection Pools API 文档获取更多信息。
五、常见问题解答
asyncpg 支持 DB-API 吗?
不支持。DB-API 是一个同步 API,而 asyncpg 基于异步 I/O 模型构建。因此,无法实现与 DB-API 的完全兼容,我们决定将 asyncpg 的 API 设计得更贴近 PostgreSQL 的架构和术语。未来某个时间点,我们会发布一个同步的、兼容 DB-API 的 asyncpg 版本。
能否将 asyncpg 与 SQLAlchemy ORM 结合使用?
可以。SQLAlchemy 1.4 及更高版本原生支持 asyncpg 方言,具体用法请参考其官方文档。对于较旧版本的 SQLAlchemy,可通过第三方适配器(如 asyncpgsa 或 databases)实现集成。
能否对 asyncpg.Record
使用点号访问?这样看起来更简洁。
我们决定不将 asyncpg.Record
实现为具名元组,因为需要保持 Record
方法命名空间与列命名空间的隔离。不过,你可以通过 connect()
或任何返回 Record 的方法的 record_class
参数,提供一个支持点号访问的自定义 Record
类。
class MyRecord(asyncpg.Record):def __getattr__(self, name):return self[name]
为什么不能在事务外使用 cursor ?
通过调用 Connection.cursor()
或 PreparedStatement.cursor()
创建的游标不能在事务外使用。任何此类尝试都会导致 InterfaceError
。如需创建可在事务外使用的游标,请直接使用 DECLARE ... CURSOR WITH HOLD
SQL 语句。
为什么会出现预处理语句错误?
如果你间歇性遇到 prepared statement "__asyncpg_stmt_xx__" does not exist
或 prepared statement “__asyncpg_stmt_xx__” already exists
错误,很可能是因为你没有直接连接 PostgreSQL 服务器,而是通过 pgbouncer 进行连接。当 pgbouncer 处于 "transaction"
或 "statement"
连接池模式时,不支持预处理语句。你有以下几种解决方案:
- 如果使用 pgbouncer 只是为了降低新建连接的开销(而不是为了通过连接池支持大量客户端以实现更好的扩展性),可以改用 asyncpg 提供的连接池功能,这会是更合适的选择;
- 通过向
asyncpg.connect()
和asyncpg.create_pool()
传递statement_cache_size=0
参数来禁用自动使用预处理语句(同时避免使用Connection.prepare()
); - 将 pgbouncer 的
pool_mode
切换为session
模式。
为什么使用 expression IN $1
时会报 PostgresSyntaxError
错误?
expression IN $1
不是有效的 PostgreSQL 语法。若要对一个序列进行值检查,请使用 expression = any($1::mytype[])
语法,其中 mytype
表示数组元素类型。
伊织 xAI 2025-06-01(日)