Source code for minos.aggregate.transactions.repositories.abc

from __future__ import (
    annotations,
)

from abc import (
    ABC,
    abstractmethod,
)
from datetime import (
    datetime,
)
from typing import (
    AsyncIterator,
    Optional,
)
from uuid import (
    UUID,
)

from minos.common import (
    Inject,
    Injectable,
    Lock,
    LockPool,
    NotProvidedException,
    PoolFactory,
    SetupMixin,
)

from ...exceptions import (
    TransactionNotFoundException,
)
from ..entries import (
    TransactionEntry,
    TransactionStatus,
)


[docs]@Injectable("transaction_repository") class TransactionRepository(ABC, SetupMixin): """Transaction Repository base class."""
[docs] @Inject() def __init__( self, lock_pool: Optional[LockPool] = None, pool_factory: Optional[PoolFactory] = None, *args, **kwargs ): super().__init__(*args, **kwargs) if lock_pool is None and pool_factory is not None: lock_pool = pool_factory.get_pool("lock") if lock_pool is None: raise NotProvidedException("A lock pool instance is required.") self._lock_pool = lock_pool
[docs] async def submit(self, transaction: TransactionEntry) -> TransactionEntry: """Submit a new or updated transaction to store it on the repository. :param transaction: The transaction to be stored. :return: This method does not return anything. """ return await self._submit(transaction)
@abstractmethod async def _submit(self, transaction: TransactionEntry) -> TransactionEntry: raise NotImplementedError # noinspection PyUnusedLocal
[docs] async def get(self, uuid: UUID, **kwargs) -> TransactionEntry: """Get a ``TransactionEntry`` from its identifier. :param uuid: Identifier of the ``RootEntity``. :param kwargs: Additional named arguments. :return: The ``TransactionEntry`` instance. """ try: return await self.select(uuid=uuid).__anext__() except StopAsyncIteration: raise TransactionNotFoundException(f"Transaction identified by {uuid!r} does not exist.")
[docs] async def select( self, uuid: Optional[UUID] = None, uuid_ne: Optional[UUID] = None, uuid_in: Optional[tuple[UUID, ...]] = None, destination_uuid: Optional[UUID] = None, status: Optional[TransactionStatus] = None, status_in: Optional[tuple[str, ...]] = None, event_offset: Optional[int] = None, event_offset_lt: Optional[int] = None, event_offset_gt: Optional[int] = None, event_offset_le: Optional[int] = None, event_offset_ge: Optional[int] = None, updated_at: Optional[datetime] = None, updated_at_lt: Optional[datetime] = None, updated_at_gt: Optional[datetime] = None, updated_at_le: Optional[datetime] = None, updated_at_ge: Optional[datetime] = None, **kwargs, ) -> AsyncIterator[TransactionEntry]: """Get a transaction from the repository. :param uuid: Transaction identifier equal to the given value. :param uuid_ne: Transaction identifier not equal to the given value :param uuid_in: Transaction identifier within the given values. :param destination_uuid: Destination Transaction identifier equal to the given value. :param status: Transaction status equal to the given value. :param status_in: Transaction status within the given values :param event_offset: Event offset equal to the given value. :param event_offset_lt: Event Offset lower than the given value :param event_offset_gt: Event Offset greater than the given value :param event_offset_le: Event Offset lower or equal to the given value :param event_offset_ge: Event Offset greater or equal to the given value :param updated_at: Updated at equal to the given value. :param updated_at_lt: Updated at lower than the given value. :param updated_at_gt: Updated at greater than the given value. :param updated_at_le: Updated at lower or equal to the given value. :param updated_at_ge: Updated at greater or equal to the given value. :param kwargs: Additional named arguments. :return: An asynchronous iterator. """ generator = self._select( uuid=uuid, uuid_ne=uuid_ne, uuid_in=uuid_in, destination_uuid=destination_uuid, status=status, status_in=status_in, event_offset=event_offset, event_offset_lt=event_offset_lt, event_offset_gt=event_offset_gt, event_offset_le=event_offset_le, event_offset_ge=event_offset_ge, updated_at=updated_at, updated_at_lt=updated_at_lt, updated_at_gt=updated_at_gt, updated_at_le=updated_at_le, updated_at_ge=updated_at_ge, **kwargs, ) # noinspection PyTypeChecker async for entry in generator: yield entry
@abstractmethod async def _select(self, **kwargs) -> AsyncIterator[TransactionEntry]: raise NotImplementedError
[docs] def write_lock(self) -> Lock: """Get write lock. :return: An asynchronous context manager. """ return self._lock_pool.acquire("aggregate_transaction_write_lock")