sqlachemy
from sqlalchemy import create_engine, String, select, update, delete
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session# 定义模型基类class Base(DeclarativeBase):pass# 定义用户模型class User(Base):__tablename__ = "user1"id: Mapped[int] = mapped_column(primary_key=True)name: Mapped[str] = mapped_column(String(30))email: Mapped[str] = mapped_column(String(50), unique=True)def __repr__(self) -> str:return f"User(id={self.id!r}, name={self.name!r}, email={self.email!r})"# 创建数据库引擎(使用SQLite内存数据库)
engine = create_engine("postgresql://postgres:carizon@10.11.96.70:5432/hdmap_check", echo=True)# 创建表
Base.metadata.create_all(engine)# 使用上下文管理器创建会话
with Session(engine) as session:# 1. 增加(Create) - 添加新用户# print("=== 创建用户 ===")# new_user = User(name="张三", email="zhangsan@example.com")# new_user2 = User(name="张三2", email="zhangsan@example.com")# # session.add(new_user)# session.add_all([new_user,new_user2])# session.commit()# print(f"已创建用户: {new_user}")# # 添加更多用户# users_data = [# User(name="李四", email="lisi@example.com"),# User(name="王五", email="wangwu@example.com"),# User(name="赵六", email="zhaoliu@example.com")# ]# session.add_all(users_data)# session.commit()# print("已批量添加用户")# # 2. 查询(Read) - 获取所有用户# print("\n=== 查询所有用户 ===")# stmt = select(User)# users = session.execute(stmt).scalars().all()# for user in users:# print(user)# # 查询特定用户# print("\n=== 查询特定用户 ===")# stmt = select(User).where(User.name == "张三2")# # print("sql=",stmt)# # user = session.execute(stmt).scalar_one()# # user = session.execute(stmt).scalar_one_or_none()# user = session.execute(stmt).first()# print(f"找到用户: {user}")# 3. 更新(Update) - 修改用户信息print("\n=== 更新用户 ===")stmt = update(User).where(User.name == "张三2").values(email="wangwu_updated@example.com", name="张三_updated")session.execute(stmt)session.commit()# 验证更新stmt = select(User).where(User.name == "张三_updated")updated_user = session.execute(stmt).scalar_one()updated_user.email="sss"session.commit()print(f"更新后的用户: {updated_user}")# # 4. 删除(Delete) - 删除用户# print("\n=== 删除用户 ===")# stmt = delete(User).where(User.name == "赵六")# session.execute(stmt)# session.commit()# # 验证删除# stmt = select(User)# remaining_users = session.execute(stmt).scalars().all()# print("删除后的剩余用户:")# for user in remaining_users:# print(user)# # 使用事务的另一种方式
# print("\n=== 使用事务操作 ===")
# with Session(engine) as session:
# with session.begin():
# # 在事务中执行多个操作
# new_user = User(name="钱七", email="qianqi@example.com")
# session.add(new_user)# # 更新操作
# stmt = update(User).where(User.name == "张三").values(email="zhangsan_updated@example.com")
# session.execute(stmt)# # 事务已提交,验证结果
# users = session.execute(select(User)).scalars().all()
# print("事务操作后的所有用户:")
# for user in users:
# print(user)