Source code for minos.aggregate.snapshots.repositories.abc

from __future__ import (
    annotations,
)

from abc import (
    ABC,
    abstractmethod,
)
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Awaitable,
    Optional,
    Union,
)
from uuid import (
    UUID,
)

from minos.common import (
    Injectable,
    SetupMixin,
    classname,
)

from ...exceptions import (
    NotFoundException,
)
from ...queries import (
    _TRUE_CONDITION,
    _Condition,
    _EqualCondition,
    _Ordering,
)
from ...transactions import (
    TRANSACTION_CONTEXT_VAR,
    TransactionEntry,
)
from ..entries import (
    SnapshotEntry,
)

if TYPE_CHECKING:
    from ...entities import (
        RootEntity,
    )


[docs]@Injectable("snapshot_repository") class SnapshotRepository(ABC, SetupMixin): """Base Snapshot class. The snapshot provides a direct accessor to the ``RootEntity`` instances stored as events by the event repository class. """
[docs] async def get( self, name: Union[str, type[RootEntity]], uuid: UUID, transaction: Optional[TransactionEntry] = None, **kwargs, ) -> RootEntity: """Get a ``RootEntity`` instance from its identifier. :param name: Class name of the ``RootEntity``. :param uuid: Identifier of the ``RootEntity``. :param transaction: The transaction within the operation is performed. If not any value is provided, then the transaction is extracted from the context var. If not any transaction is being scoped then the query is performed to the global snapshot. :param kwargs: Additional named arguments. :return: The ``RootEntity`` instance. """ snapshot_entry = await self.get_entry(name, uuid, transaction=transaction, **kwargs) instance = snapshot_entry.build(**kwargs) return instance
[docs] async def get_entry(self, name: str, uuid: UUID, **kwargs) -> SnapshotEntry: """Get a ``SnapshotEntry`` from its identifier. :param name: Class name of the ``RootEntity``. :param uuid: Identifier of the ``RootEntity``. :param kwargs: Additional named arguments. :return: The ``SnapshotEntry`` instance. """ try: return await self.find_entries( name, _EqualCondition("uuid", uuid), **kwargs | {"exclude_deleted": False} ).__anext__() except StopAsyncIteration: raise NotFoundException(f"The instance could not be found: {uuid!s}")
[docs] def get_all( self, name: str, ordering: Optional[_Ordering] = None, limit: Optional[int] = None, streaming_mode: bool = False, transaction: Optional[TransactionEntry] = None, **kwargs, ) -> AsyncIterator[RootEntity]: """Get all ``RootEntity`` instances. :param name: Class name of the ``RootEntity``. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open database connection), otherwise preloads the full set of values on memory and then retrieves them. :param transaction: The transaction within the operation is performed. If not any value is provided, then the transaction is extracted from the context var. If not any transaction is being scoped then the query is performed to the global snapshot. :param kwargs: Additional named arguments. :return: An asynchronous iterator that containing the ``RootEntity`` instances. """ return self.find( name, _TRUE_CONDITION, ordering=ordering, limit=limit, streaming_mode=streaming_mode, transaction=transaction, **kwargs, )
[docs] async def find( self, name: Union[str, type[RootEntity]], condition: _Condition, ordering: Optional[_Ordering] = None, limit: Optional[int] = None, streaming_mode: bool = False, transaction: Optional[TransactionEntry] = None, **kwargs, ) -> AsyncIterator[RootEntity]: """Find a collection of ``RootEntity`` instances based on a ``Condition``. :param name: Class name of the ``RootEntity``. :param condition: The condition that must be satisfied by the ``RootEntity`` instances. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open database connection), otherwise preloads the full set of values on memory and then retrieves them. :param transaction: The transaction within the operation is performed. If not any value is provided, then the transaction is extracted from the context var. If not any transaction is being scoped then the query is performed to the global snapshot. :param kwargs: Additional named arguments. :return: An asynchronous iterator that containing the ``RootEntity`` instances. """ iterable = self.find_entries( name=name, condition=condition, ordering=ordering, limit=limit, streaming_mode=streaming_mode, transaction=transaction, **kwargs, ) async for snapshot_entry in iterable: yield snapshot_entry.build(**kwargs)
[docs] async def find_entries( self, name: str, condition: _Condition, ordering: Optional[_Ordering] = None, limit: Optional[int] = None, streaming_mode: bool = False, transaction: Optional[TransactionEntry] = None, exclude_deleted: bool = True, synchronize: bool = True, **kwargs, ) -> AsyncIterator[SnapshotEntry]: """Find a collection of ``SnapshotEntry`` instances based on a ``Condition``. :param name: Class name of the ``RootEntity``. :param condition: The condition that must be satisfied by the ``RootEntity`` instances. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open database connection), otherwise preloads the full set of values on memory and then retrieves them. :param transaction: The transaction within the operation is performed. If not any value is provided, then the transaction is extracted from the context var. If not any transaction is being scoped then the query is performed to the global snapshot. :param exclude_deleted: If ``True``, deleted ``RootEntity`` entries are included, otherwise deleted ``RootEntity`` entries are filtered. :param synchronize: If ``True`` a synchronization is performed before processing the query, otherwise the query is performed without any synchronization step. :param kwargs: Additional named arguments. :return: An asynchronous iterator that containing the ``RootEntity`` instances. """ if isinstance(name, type): name = classname(name) if transaction is None: transaction = TRANSACTION_CONTEXT_VAR.get() if synchronize: await self.synchronize(**kwargs) iterable = self._find_entries( name=name, condition=condition, ordering=ordering, limit=limit, streaming_mode=streaming_mode, transaction=transaction, exclude_deleted=exclude_deleted, **kwargs, ) async for entry in iterable: yield entry
@abstractmethod def _find_entries( self, name: str, condition: _Condition, ordering: Optional[_Ordering], limit: Optional[int], streaming_mode: bool, transaction: Optional[TransactionEntry], exclude_deleted: bool, **kwargs, ) -> AsyncIterator[SnapshotEntry]: raise NotImplementedError
[docs] def synchronize(self, **kwargs) -> Awaitable[None]: """Synchronize the snapshot to the latest available version. :param kwargs: Additional named arguments. :return: This method does not return anything. """ return self._synchronize(**kwargs, synchronize=False)
@abstractmethod async def _synchronize(self, **kwargs) -> None: raise NotImplementedError