sqlalchemy 事务自动控制(类java aop)

发布时间:2024年01月11日

最近使用它交互数据库,想实现类似java aop那种自动事务控制,不用手动commit或者rollback。我是用的是flask+denpendency-injecter

?这是我的db的配置类,里面会初始化一些session配置,里面比较重要的是把autocommit和autoflush关闭了,因为我们的代码会来处理这个,还有就是把expire_on_commit设置为flase,否则你commit之后,再取用某个entity就会报错了,例如你新建了一个entity,这个时候会更新他的id,返回给前端的时候就会报错了(Error Messages — SQLAlchemy 2.0 Documentation)。

"""Database module."""

from contextlib import contextmanager, AbstractContextManager
from typing import Callable

from sqlalchemy import create_engine, orm
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
Base = declarative_base()

class DatabaseConfig:

    def __init__(self, db_url: str) -> None:
        self._engine = create_engine(db_url, echo=True)
        self._session_factory = orm.scoped_session(
            orm.sessionmaker(
                autocommit=False,
                autoflush=False,
                expire_on_commit=False,
                bind=self._engine,
            ),
        )

    def create_database(self) -> None:
        Base.metadata.create_all(self._engine)


    @contextmanager
    def session(self) -> Callable[..., AbstractContextManager[Session]]:
        session: Session = self._session_factory()
        try:
            yield session
        except Exception:
            session.rollback()
            raise
        else:
            if session._transaction.is_active:
                session.commit()
            session.close()

然后comtextmanger里面就是我们的处理代码了,我们主要依靠with代码块来控制,在yield之前的属于__init__,在yield之后属于__exit__,也就是当with代码块结束之前,如果发生任何报错,我们都会进行rollback操作,并且raise(这部分需要error handler来做了,这里就不赘述了),然后如果什么错误都没有发生,就检测transaction是否还是active,如果是就commit,然后关闭session。

?然后在Container中注入session?contextmanager。

class Container(containers.DeclarativeContainer):
  
    wiring_config = containers.WiringConfiguration(packages=[
        "main"
    ])
    config = providers.Configuration(yaml_files=["config.yml"])
 
    db=providers.Singleton(DatabaseConfig,db_url=config.db.url)
    
    user_repository = providers.Factory(
        UserRepositoryImpl
    )
    
    user_service = providers.Factory(
        UserService,
        user_repository=user_repository,
        session_factory=db.provided.session
    )

然后再service层使用with代码块控制transation ,整个逻辑包含在同一个with中就行了。

class UserService:

    @inject
    def __init__(self, user_repository: UserRepository, session_factory: Callable[..., AbstractContextManager[Session]]) -> None:
        self._repository: UserRepository = user_repository
        self.session_factory=session_factory

    def create_user(self,user) -> User:
        with self.session_factory() as session:
            return self._repository.add(
                session=session,
                user=user
            )
    

然后在repo里面写具体代码就行了

class UserRepositoryImpl(UserRepository):

    def __init__(self) -> None:
        pass

    def add(self, user,session):
        session.add(user)
        return user

文章来源:https://blog.csdn.net/Damien_J_Scott/article/details/135519874
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。