Source code for minos.aggregate.transactions.repositories.database.impl

from __future__ import (
    annotations,
)

from typing import (
    AsyncIterator,
    Optional,
)

from minos.common import (
    DatabaseMixin,
    ProgrammingException,
)

from ....exceptions import (
    TransactionRepositoryConflictException,
)
from ...entries import (
    TransactionEntry,
)
from ..abc import (
    TransactionRepository,
)
from .factories import (
    TransactionDatabaseOperationFactory,
)


[docs]class DatabaseTransactionRepository(DatabaseMixin[TransactionDatabaseOperationFactory], TransactionRepository): """Database Transaction Repository class."""
[docs] def __init__(self, *args, database_key: Optional[tuple[str]] = None, **kwargs): if database_key is None: database_key = ("aggregate", "transaction") super().__init__(*args, database_key=database_key, **kwargs)
async def _setup(self): operation = self.database_operation_factory.build_create() await self.execute_on_database(operation) async def _submit(self, transaction: TransactionEntry) -> TransactionEntry: operation = self.database_operation_factory.build_submit( **transaction.as_raw(), ) try: updated_at = await self.execute_on_database_and_fetch_one(operation) except ProgrammingException: raise TransactionRepositoryConflictException( f"{transaction!r} status is invalid respect to the previous one." ) transaction.updated_at = updated_at return transaction async def _select(self, streaming_mode: Optional[bool] = None, **kwargs) -> AsyncIterator[TransactionEntry]: operation = self.database_operation_factory.build_query(**kwargs) async for row in self.execute_on_database_and_fetch_all(operation, streaming_mode=streaming_mode): yield TransactionEntry(*row, transaction_repository=self)