Source code for minos.aggregate.events.repositories.abc

from __future__ import (
    annotations,
)

from abc import (
    ABC,
    abstractmethod,
)
from asyncio import (
    gather,
)
from contextlib import (
    suppress,
)
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Awaitable,
    Optional,
    Union,
)
from uuid import (
    UUID,
)

from minos.common import (
    NULL_UUID,
    Inject,
    Injectable,
    Lock,
    LockPool,
    NotProvidedException,
    PoolFactory,
    SetupMixin,
    classname,
)
from minos.networks import (
    BrokerMessageV1,
    BrokerMessageV1Payload,
    BrokerMessageV1Strategy,
    BrokerPublisher,
)

from ...actions import (
    Action,
)
from ...contextvars import (
    IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
)
from ...exceptions import (
    EventRepositoryConflictException,
    EventRepositoryException,
)
from ...transactions import (
    TRANSACTION_CONTEXT_VAR,
    TransactionEntry,
    TransactionRepository,
    TransactionStatus,
)
from ..entries import (
    EventEntry,
)
from ..fields import (
    IncrementalFieldDiff,
)
from ..models import (
    Event,
)

if TYPE_CHECKING:
    from ...entities import (
        RootEntity,
    )


[docs]@Injectable("event_repository") class EventRepository(ABC, SetupMixin): """Base event repository class in ``minos``."""
[docs] @Inject() def __init__( self, broker_publisher: BrokerPublisher, transaction_repository: TransactionRepository, lock_pool: Optional[LockPool] = None, pool_factory: Optional[PoolFactory] = None, *args, **kwargs, ): super().__init__(*args, **kwargs) if lock_pool is None and pool_factory is not None: lock_pool = pool_factory.get_pool("lock") if broker_publisher is None: raise NotProvidedException("A broker instance is required.") if transaction_repository is None: raise NotProvidedException("A transaction repository instance is required.") if lock_pool is None: raise NotProvidedException("A lock pool instance is required.") self._broker_publisher = broker_publisher self._transaction_repository = transaction_repository self._lock_pool = lock_pool
[docs] def transaction(self, **kwargs) -> TransactionEntry: """Build a transaction instance related to the repository. :param kwargs: Additional named arguments. :return: A new ``TransactionEntry`` instance. """ return TransactionEntry(event_repository=self, transaction_repository=self._transaction_repository, **kwargs)
[docs] async def create(self, entry: Union[Event, EventEntry]) -> EventEntry: """Store new creation entry into the repository. :param entry: Entry to be stored. :return: The repository entry containing the stored information. """ entry.action = Action.CREATE return await self.submit(entry)
[docs] async def update(self, entry: Union[Event, EventEntry]) -> EventEntry: """Store new update entry into the repository. :param entry: Entry to be stored. :return: The repository entry containing the stored information. """ entry.action = Action.UPDATE return await self.submit(entry)
[docs] async def delete(self, entry: Union[Event, EventEntry]) -> EventEntry: """Store new deletion entry into the repository. :param entry: Entry to be stored. :return: The repository entry containing the stored information. """ entry.action = Action.DELETE return await self.submit(entry)
[docs] async def submit(self, entry: Union[Event, EventEntry], **kwargs) -> EventEntry: """Store new entry into the repository. :param entry: The entry to be stored. :param kwargs: Additional named arguments. :return: The repository entry containing the stored information. """ token = IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.set(True) try: transaction = TRANSACTION_CONTEXT_VAR.get() if isinstance(entry, Event): entry = EventEntry.from_event(entry, transaction=transaction) if not isinstance(entry.action, Action): raise EventRepositoryException("The 'EventEntry.action' attribute must be an 'Action' instance.") async with self.write_lock(): if not await self.validate(entry, **kwargs): raise EventRepositoryConflictException(f"{entry!r} could not be committed!", await self.offset) entry = await self._submit(entry, **kwargs) if entry.transaction_uuid == NULL_UUID: await self._send_events(entry.event) finally: IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.reset(token) return entry
# noinspection PyUnusedLocal
[docs] async def validate(self, entry: EventEntry, transaction_uuid_ne: Optional[UUID] = None, **kwargs) -> bool: """Check if it is able to submit the given entry. :param entry: The entry to be validated. :param transaction_uuid_ne: Optional transaction identifier to skip it from the validation. :param kwargs: Additional named arguments. :return: ``True`` if the entry can be submitted or ``False`` otherwise. """ iterable = self._transaction_repository.select( destination_uuid=entry.transaction_uuid, uuid_ne=transaction_uuid_ne, status_in=(TransactionStatus.RESERVING, TransactionStatus.RESERVED, TransactionStatus.COMMITTING), ) transaction_uuids = {e.uuid async for e in iterable} if len(transaction_uuids): with suppress(StopAsyncIteration): iterable = self.select(uuid=entry.uuid, transaction_uuid_in=tuple(transaction_uuids), **kwargs) await iterable.__anext__() # Will raise a `StopAsyncIteration` exception if not any item. return False return True
@abstractmethod async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry: raise NotImplementedError async def _send_events(self, event: Event): suffix_mapper = { Action.CREATE: "Created", Action.UPDATE: "Updated", Action.DELETE: "Deleted", } topic = f"{event.simplified_name}{suffix_mapper[event.action]}" message = BrokerMessageV1( topic=topic, payload=BrokerMessageV1Payload(content=event), strategy=BrokerMessageV1Strategy.MULTICAST, ) futures = [self._broker_publisher.send(message)] if event.action == Action.UPDATE: for decomposed_event in event.decompose(): diff = next(iter(decomposed_event.fields_diff.flatten_values())) composed_topic = f"{topic}.{diff.name}" if isinstance(diff, IncrementalFieldDiff): composed_topic += f".{diff.action.value}" message = BrokerMessageV1( topic=composed_topic, payload=BrokerMessageV1Payload(content=decomposed_event), strategy=BrokerMessageV1Strategy.MULTICAST, ) futures.append(self._broker_publisher.send(message)) await gather(*futures) # noinspection PyShadowingBuiltins
[docs] async def select( self, uuid: Optional[UUID] = None, name: Optional[Union[str, type[RootEntity]]] = None, version: Optional[int] = None, version_lt: Optional[int] = None, version_gt: Optional[int] = None, version_le: Optional[int] = None, version_ge: Optional[int] = None, id: Optional[int] = None, id_lt: Optional[int] = None, id_gt: Optional[int] = None, id_le: Optional[int] = None, id_ge: Optional[int] = None, transaction_uuid: Optional[UUID] = None, transaction_uuid_ne: Optional[UUID] = None, transaction_uuid_in: Optional[tuple[UUID, ...]] = None, **kwargs, ) -> AsyncIterator[EventEntry]: """Perform a selection query of entries stored in to the repository. :param uuid: The identifier must be equal to the given value. :param name: The classname must be equal to the given value. :param version: The version must be equal to the given value. :param version_lt: The version must be lower than the given value. :param version_gt: The version must be greater than the given value. :param version_le: The version must be lower or equal to the given value. :param version_ge: The version must be greater or equal to the given value. :param id: The entry identifier must be equal to the given value. :param id_lt: The entry identifier must be lower than the given value. :param id_gt: The entry identifier must be greater than the given value. :param id_le: The entry identifier must be lower or equal to the given value. :param id_ge: The entry identifier must be greater or equal to the given value. :param transaction_uuid: The transaction identifier must be equal to the given value. :param transaction_uuid_ne: The transaction identifier must be distinct of the given value. :param transaction_uuid_in: The destination transaction identifier must be equal to one of the given values. :return: A list of entries. """ if isinstance(name, type): name = classname(name) generator = self._select( uuid=uuid, name=name, version=version, version_lt=version_lt, version_gt=version_gt, version_le=version_le, version_ge=version_ge, id=id, id_lt=id_lt, id_gt=id_gt, id_le=id_le, id_ge=id_ge, transaction_uuid=transaction_uuid, transaction_uuid_ne=transaction_uuid_ne, transaction_uuid_in=transaction_uuid_in, **kwargs, ) # noinspection PyTypeChecker async for entry in generator: yield entry
@abstractmethod async def _select(self, *args, **kwargs) -> AsyncIterator[EventEntry]: """Perform a selection query of entries stored in to the repository.""" @property def offset(self) -> Awaitable[int]: """Get the current repository offset. :return: An awaitable containing an integer value. """ return self._offset @property @abstractmethod async def _offset(self) -> int: raise NotImplementedError
[docs] def write_lock(self) -> Lock: """Get a write lock. :return: An asynchronous context manager. """ return self._lock_pool.acquire("aggregate_event_write_lock")