在 Python 中使用 sqlalchemy 来操作数据库的几个小总结
创始人
2024-11-14 13:06:14
0

在探索使用 FastAPI, SQLAlchemy, Pydantic,Redis, JWT 构建的项目的时候,其中数据库访问采用SQLAlchemy,并采用异步方式。数据库操作和控制器操作,采用基类继承的方式减少重复代码,提高代码复用性。在这个过程中设计接口和测试的时候,对一些问题进行跟踪解决,并记录供参考。

1、SQLAlchemy事务处理

在异步环境中,批量更新操作需要使用异步方法来执行查询和提交事务。

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:     """批量更新对象"""     try:         async with db.begin():  # 使用事务块确保批量操作的一致性             for obj_in in obj_in_list:                 # 查询对象                 query = select(self.model).filter(self.model.id == obj_in.id)                 result = await db.execute(query)                 db_obj = result.scalars().first()                                  if db_obj:                     # 获取更新数据                     update_data = obj_in.model_dump(skip_defaults=True)                                          # 更新对象字段                     for field, value in update_data.items():                         setattr(db_obj, field, value)                  return True     except SQLAlchemyError as e:         print(e)         # 异常处理时,事务会自动回滚         return False

在这个改进后的代码中:

  1. 事务块:使用 async with db.begin() 创建事务块,以确保批量操作的一致性。事务块会在操作完成后自动提交,并在出现异常时回滚。

  2. 查询对象:使用 select(self.model).filter(self.model.id == obj_in.id) 进行异步查询,并使用 await db.execute(query) 执行查询。

  3. 更新对象字段:用 setattr 更新对象的字段。

  4. 异常处理:捕获 SQLAlchemyError 异常,并在异常发生时回滚事务。事务块会自动处理回滚,因此不需要手动回滚。

这种方式确保了在异步环境中批量更新操作的正确性和一致性。

在使用 async with db.begin() 进行事务管理时,事务会自动提交。如果在事务块内执行的所有操作都成功,事务会在退出时自动提交;如果出现异常,事务会自动回滚。

因此,手动调用 await db.commit() 是不必要的,因为事务块会处理这些操作。如果你不使用事务块,并希望手动控制事务的提交,可以如下修改:

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:     """批量更新对象"""     try:         for obj_in in obj_in_list:             query = select(self.model).filter(self.model.id == obj_in.id)             result = await db.execute(query)             db_obj = result.scalars().first()                          if db_obj:                 update_data = obj_in.model_dump(skip_defaults=True)                                  for field, value in update_data.items():                     setattr(db_obj, field, value)                  await db.commit()  # 手动提交事务         return True     except SQLAlchemyError as e:         print(e)         await db.rollback()  # 确保在出错时回滚事务         return False

在这个手动提交事务的例子中:

  • 在更新对象的操作完成后,使用 await db.commit() 来提交事务。

  • 如果发生异常,使用 await db.rollback() 来回滚事务。

根据需求选择合适的方法进行事务管理。事务块方式通常是更安全和简洁的选择。

在异步环境中,create_update 方法需要对数据库进行异步查询、更新或创建操作。

async def create_update(     self, obj_in: DtoType, id: PrimaryKeyType, db: AsyncSession ) -> bool:     """创建或更新对象"""     try:         # 查询对象         query = select(self.model).filter(self.model.id == id)         result = await db.execute(query)         db_obj = result.scalars().first()                  if db_obj:             # 更新对象             return await self.update(obj_in, db)         else:             # 创建对象             return await self.create(obj_in, db)     except SQLAlchemyError as e:         print(e)         # 确保在出错时回滚事务         await db.rollback()         return False

在这个代码中:

  1. 异步查询:使用 select(self.model).filter(self.model.id == id) 来构建查询,并用 await db.execute(query) 执行查询。

  2. 获取对象:使用 result.scalars().first() 来获取查询结果中的第一个对象。

  3. 调用更新或创建方法:根据查询结果的有无,分别调用 self.update 或 self.create 方法。确保这两个方法都是异步的,并在调用时使用 await

  4. 异常处理:捕获 SQLAlchemyError 异常,并在发生异常时使用 await db.rollback() 来回滚事务。

在异步环境中,批量插入对象通常需要使用异步方法来执行数据库操作。由于 bulk_insert_mappings 在 SQLAlchemy 的异步版本中可能不直接支持,你可以使用 add_all 方法来批量添加对象。

async def save_import(self, data: List[DtoType], db: AsyncSession) -> bool:     """批量导入对象"""     try:         # 将 DTO 转换为模型实例         db_objs = [self.model(**obj_in.model_dump()) for obj_in in data]                  # 批量添加对象         db.add_all(db_objs)                  # 提交事务         await db.commit()                  return True     except SQLAlchemyError as e:         print(e)         await db.rollback()  # 确保在出错时回滚事务         return False

代码说明:

  1. 转换 DTO 为模型实例:使用 [self.model(**obj_in.model_dump()) for obj_in in data] 将 data 列表中的 DTO 转换为模型实例列表。

  2. 批量添加对象:使用 db.add_all(db_objs) 批量添加对象到数据库会话。

  3. 提交事务:使用 await db.commit() 异步提交事务。

  4. 异常处理:捕获 SQLAlchemyError 异常,使用 await db.rollback() 回滚事务以确保在出错时数据库状态的一致性。

这种方式确保了在异步环境中正确地进行批量导入操作,并处理可能出现的异常。

2、在 SQLAlchemy 中select(...).where(...) 和 select(...).filter(...)的差异

在 SQLAlchemy 中,select(...).where(...) 和 select(...).filter(...) 都用于构造查询条件,但它们有一些细微的差别和适用场景。

1. where(...)
  • 定义where 是 SQLAlchemy 中 select 对象的方法,用于添加查询的条件。

  • 用法query = select(self.model).where(self.model.id == id)

  • 描述where 方法用于指定 SQL WHERE 子句的条件。在大多数情况下,它的行为和 filter 是等效的。

2. filter(...)
  • 定义filter 是 SQLAlchemy 中 Query 对象的方法,用于添加查询的条件。

  • 用法query = select(self.model).filter(self.model.id == id)

  • 描述filter 方法也用于指定 SQL WHERE 子句的条件。它通常用于更复杂的查询构建中,尤其是在 ORM 查询中。

主要差异

  • 上下文where 是 select 对象的一部分,通常用于构建 SQL 查询(SQLAlchemy Core)。而 filter 是 Query 对象的一部分,通常用于 ORM 查询(SQLAlchemy ORM)。然而,在 SQLAlchemy 2.0+ 中,select 和 filter 的使用变得更加一致。

  • 语义:在使用 SQLAlchemy Core 时,where 更加明确地表示你正在添加 SQL 语句中的 WHERE 子句。在 ORM 查询中,filter 也做了类似的事情,但它提供了更多 ORM 相关的功能。

使用 where 的示例(SQLAlchemy Core):

from sqlalchemy.future import select from sqlalchemy.ext.asyncio import AsyncSession  async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:     query = select(self.model).where(self.model.id == id)     result = await db.execute(query)     return result.scalars().first()

使用 filter 的示例(SQLAlchemy ORM):

from sqlalchemy.orm import sessionmaker  async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:     query = select(self.model).filter(self.model.id == id)     result = await db.execute(query)     return result.scalars().first()

总结
  • 在 SQLAlchemy Core 中where 是构建查询条件的标准方法。

  • 在 SQLAlchemy ORM 中filter 用于构建查询条件,但在 Core 中,filter 的使用相对较少。

在 SQLAlchemy 2.0 及更高版本中,select 的 where 和 filter 的用法变得越来越一致,你可以根据自己的习惯和需求选择其中一种。在实际开发中,选择哪一种方法通常取决于你的代码上下文和个人偏好。

3、model_dump(exclude_unset=True) 和model_dump(skip_defaults=True)有什么差异

model_dump(exclude_unset=True) 和 model_dump(skip_defaults=True) 是用于处理模型实例的序列化方法,它们的用途和行为略有不同。这两个方法通常用于将模型实例转换为字典,以便进行进一步的处理或传输。

model_dump(exclude_unset=True)

exclude_unset=True 是一个选项,通常用于序列化方法中,表示在转换模型实例为字典时,排除那些未设置的字段。

  • 功能:排除所有未显式设置(即使用默认值)的字段。

  • 使用场景:适用于需要忽略那些未被用户设置的字段,以避免在输出中包含默认值。

# 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值 model_instance = MyModel(name='Alice', age=25) # 如果 age 的默认值是 0, exclude_unset=True 将只包含 'name' serialized_data = model_instance.model_dump(exclude_unset=True)

model_dump(skip_defaults=True)

skip_defaults=True 是另一个选项,表示在转换模型实例为字典时,排除那些使用了默认值的字段。

  • 功能:排除所有字段的值等于其默认值的字段。

  • 使用场景:适用于需要排除那些显式设置为默认值的字段,以减少输出的冗余信息。

# 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值 model_instance = MyModel(name='Alice', age=25) # 如果 age 的默认值是 0, skip_defaults=True 将只包含 'name' serialized_data = model_instance.model_dump(skip_defaults=True)

主要区别
  • 排除条件

    exclude_unset=True 排除那些在模型实例中未显式设置的字段(即字段值为默认值或未赋值)。

    skip_defaults=True 排除那些字段值等于其默认值的字段。

  • 适用场景

    使用 exclude_unset=True 时,目的是排除那些在实例化过程中未被显式赋值的字段,这通常用于避免包含那些尚未配置的字段。

    使用 skip_defaults=True 时,目的是去掉那些显式设置为默认值的字段,以避免输出不必要的信息。

4、使用**kwargs 参数,在接口中实现数据软删除的处理

例如我们在删除接口中,如果传递了 kwargs 参数,则进行软删除(更新记录),否则进行硬删除(删除记录)。

async def delete_byid(self, id: PrimaryKeyType, db: AsyncSession, **kwargs) -> bool:         """根据主键删除一个对象          :param kwargs: for soft deletion only         """         if not kwargs:             result = await db.execute(sa_delete(self.model).where(self.model.id == id))         else:             result = await db.execute(                 sa_update(self.model).where(self.model.id == id).values(**kwargs)             )          await db.commit()         return result.rowcount > 0

实例代码如下所示。

# 示例模型 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Boolean  Base = declarative_base()  class Customer(Base):     __tablename__ = 'customer'     id = Column(Integer, primary_key=True)     name = Column(String)     is_deleted = Column(Boolean, default=False)  # 示例使用 async def main():     async with AsyncSession(engine) as session:         controller = BaseController(Customer)                  # 硬删除         result = await controller.delete_byid(1, session)         print(f"Hard delete successful: {result}")                  # 软删除         result = await controller.delete_byid(2, session, is_deleted=True)         print(f"Soft delete successful: {result}")  # 确保运行主程序 import asyncio if __name__ == "__main__":     asyncio.run(main())

注意事项

  1. 模型定义:确保你的模型中包含 is_deleted 字段,并且字段名正确。

  2. 传递参数:在调用 delete_byid 方法时,正确传递 kwargs 参数。例如,如果你要进行软删除,可以传递 is_deleted=True

  3. 调试输出:你可以添加一些调试输出(如 print(kwargs)),以确保正确传递了参数。

# 示例硬删除调用 await controller.delete_byid(1, session)  # 示例软删除调用 await controller.delete_byid(2, session, is_deleted=True)

如果我们的is_deleted 字段是Int类型的,如下所示,那么处理有所不同

class Customer(Base):     __tablename__ = "t_customer"      id = Column(String, primary_key=True, comment="主键")     name = Column(String, comment="姓名")     age = Column(Integer, comment="年龄")     creator = Column(String, comment="创建人")     createtime = Column(DateTime, comment="创建时间")     is_deleted = Column(Integer, comment="是否删除")

操作代码

# 硬删除         result = await controller.delete_byid("1", session)         print(f"Hard delete successful: {result}")                  # 软删除         result = await controller.delete_byid("2", session, is_deleted=1)         print(f"Soft delete successful: {result}")

注意事项

  1. 模型定义:你的 Customer 模型定义看起来是正确的,确保所有字段和注释都符合你的要求。

  2. 硬删除和软删除

    硬删除:直接从数据库中删除记录。

    软删除:通过更新 is_deleted 字段来标记记录为已删除,而不是实际删除记录。

  3. 正确传递参数

    硬删除时,不需要传递额外参数。

    软删除时,传递 is_deleted=1 作为参数。

通过确保正确传递参数并且模型包含正确的字段,你应该能够正确执行软删除和硬删除操作。

5、Python处理接口的时候,Iterable 和List有什么差异

在 Python 中,Iterable 和 List 是两个不同的概念,它们有各自的特点和用途:

Iterable

Iterable 是一个更广泛的概念,指的是任何可以返回一个迭代器的对象。迭代器是一个实现了 __iter__() 方法的对象,能够逐个返回元素。几乎所有的容器类型(如列表、元组、字典、集合等)都是可迭代的。要检查一个对象是否是可迭代的,可以使用 collections.abc.Iterable 来进行检查。

特点

  • 通用性Iterable 是一个通用的接口,表示对象可以被迭代。

  • 惰性:一些 Iterable 可能是惰性计算的(如生成器),即它们不会立即计算所有元素,而是按需生成元素。

  • 示例:列表(List)、元组(Tuple)、字典(Dict)、集合(Set)、生成器(Generator)等都是可迭代对象。

from collections.abc import Iterable  print(isinstance([1, 2, 3], Iterable))  # True print(isinstance((1, 2, 3), Iterable))  # True print(isinstance({1, 2, 3}, Iterable))  # True print(isinstance({'a': 1}, Iterable))   # True print(isinstance((x for x in range(3)), Iterable))  # True

List

List 是 Python 中的一种具体的容器类型,表示一个有序的元素集合,可以包含重复的元素。它是最常用的可变序列类型之一,支持索引访问、切片操作以及其他多种方法来操作列表中的元素。

特点

  • 具体实现List 是一个具体的类型,表示一个动态数组,可以存储多个对象。

  • 有序:列表保持元素的插入顺序。

  • 可变:可以对列表中的元素进行修改(如添加、删除、更新)。

  • 示例[1, 2, 3] 是一个列表。

my_list = [1, 2, 3] print(my_list)  # [1, 2, 3] my_list.append(4)  # [1, 2, 3, 4] my_list[0] = 10  # [10, 2, 3, 4]

总结一下:

  • Iterable:一个广泛的概念,表示可以被迭代的对象,不一定是具体的数据结构。例如,生成器是可迭代的但不是列表。

  • List:一个具体的容器类型,是一种有序的可变集合。列表是 Iterable 的一种实现,但并不是所有的 Iterable 都是列表。

Iterable 是一个抽象概念,而 List 是一个具体的实现。你可以在 List 之上使用许多操作和方法来处理数据,而 Iterable 主要关注的是是否可以进行迭代。

因此接收结合的处理,我们可以使用Iterable接口更加通用一些。

async def create_range(         self, obj_in_list: Iterable[DtoType], db: AsyncSession     ) -> bool:         """批量创建对象"""         try:             # 将 DTO 转换为模型实例             db_objs = [self.model(**obj_in.model_dump()) for obj_in in obj_in_list]              # 批量添加到数据库             db.add_all(db_objs)             await db.commit()             return True         except SQLAlchemyError as e:             print(e)             await db.rollback()  # 确保在出错时回滚事务             return False

以上就是在Python中使用sqlalchemy来操作数据库的时候,对一些小问题的总结,供大家参考。

文章转载自:伍华聪

原文链接:https://www.cnblogs.com/wuhuacong/p/18336423

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

相关内容

热门资讯

玩家必看教程!wepoke挂真... 玩家必看教程!wepoke挂真的,wepoke智能ai可以意思,解密教程(有挂细节)-哔哩哔哩;1、...
大家学习交流!(Wepoke实... 【福星临门,好运相随】;大家学习交流!(Wepoke实测)软件透明挂辅助透视!(x-poker)解密...
3分钟精通!大菠萝手游辅助,有... 3分钟精通!大菠萝手游辅助,有没有人wepoker(透视)规律教程(今日头条)大菠萝手游辅助软件透明...
第三方技巧!德州wpk德州真的... 第三方技巧!德州wpk德州真的,wpk ai辅助靠谱,爆料教程(了解有挂)-哔哩哔哩;是一款可以让一...
透视有挂!(哈糖大菠萝)外挂辅... 透视有挂!(哈糖大菠萝)外挂辅助科技!(微扑克辅助挂)黑科技教程(2022已更新)(哔哩哔哩);微扑...
黑科技教程!wepoke辅助是... 黑科技教程!wepoke辅助是真的,wpk透视外挂会可以样,线上教程(了解有挂)-哔哩哔哩;wpk透...
第五分钟科普!aapoker透... 第五分钟科普!aapoker透视方法,微扑克微乐辅助(透视)必备教程(有挂分析)1、微扑克微乐辅助系...
盘点几款!(红龙软件德州扑克)... 盘点几款!(红龙软件德州扑克)外挂透明挂ai辅助脚本!(鱼扑克app俱乐部)介绍教程(2026已更新...
第6分钟体悟!wepoker黑... 第6分钟体悟!wepoker黑侠辅助器正版下载,德普之星有透视辅助吗(透视)透牌教程(有挂总结)1)...
实测必看!微扑克app发牌规律... 实测必看!微扑克app发牌规律,德州ai辅助,存在挂教程(有挂秘籍)-哔哩哔哩;亲,有的,ai轻松简...