from __future__ import (
annotations,
)
from abc import (
ABC,
abstractmethod,
)
from asyncio import (
gather,
)
from contextlib import (
suppress,
)
from typing import (
TYPE_CHECKING,
AsyncIterator,
Awaitable,
Optional,
Union,
)
from uuid import (
UUID,
)
from minos.common import (
NULL_UUID,
Inject,
Injectable,
Lock,
LockPool,
NotProvidedException,
PoolFactory,
SetupMixin,
classname,
)
from minos.networks import (
BrokerMessageV1,
BrokerMessageV1Payload,
BrokerMessageV1Strategy,
BrokerPublisher,
)
from ...actions import (
Action,
)
from ...contextvars import (
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
)
from ...exceptions import (
EventRepositoryConflictException,
EventRepositoryException,
)
from ...transactions import (
TRANSACTION_CONTEXT_VAR,
TransactionEntry,
TransactionRepository,
TransactionStatus,
)
from ..entries import (
EventEntry,
)
from ..fields import (
IncrementalFieldDiff,
)
from ..models import (
Event,
)
if TYPE_CHECKING:
from ...entities import (
RootEntity,
)
[docs]@Injectable("event_repository")
class EventRepository(ABC, SetupMixin):
"""Base event repository class in ``minos``."""
[docs] @Inject()
def __init__(
self,
broker_publisher: BrokerPublisher,
transaction_repository: TransactionRepository,
lock_pool: Optional[LockPool] = None,
pool_factory: Optional[PoolFactory] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
if lock_pool is None and pool_factory is not None:
lock_pool = pool_factory.get_pool("lock")
if broker_publisher is None:
raise NotProvidedException("A broker instance is required.")
if transaction_repository is None:
raise NotProvidedException("A transaction repository instance is required.")
if lock_pool is None:
raise NotProvidedException("A lock pool instance is required.")
self._broker_publisher = broker_publisher
self._transaction_repository = transaction_repository
self._lock_pool = lock_pool
[docs] def transaction(self, **kwargs) -> TransactionEntry:
"""Build a transaction instance related to the repository.
:param kwargs: Additional named arguments.
:return: A new ``TransactionEntry`` instance.
"""
return TransactionEntry(event_repository=self, transaction_repository=self._transaction_repository, **kwargs)
[docs] async def create(self, entry: Union[Event, EventEntry]) -> EventEntry:
"""Store new creation entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
entry.action = Action.CREATE
return await self.submit(entry)
[docs] async def update(self, entry: Union[Event, EventEntry]) -> EventEntry:
"""Store new update entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
entry.action = Action.UPDATE
return await self.submit(entry)
[docs] async def delete(self, entry: Union[Event, EventEntry]) -> EventEntry:
"""Store new deletion entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
entry.action = Action.DELETE
return await self.submit(entry)
[docs] async def submit(self, entry: Union[Event, EventEntry], **kwargs) -> EventEntry:
"""Store new entry into the repository.
:param entry: The entry to be stored.
:param kwargs: Additional named arguments.
:return: The repository entry containing the stored information.
"""
token = IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.set(True)
try:
transaction = TRANSACTION_CONTEXT_VAR.get()
if isinstance(entry, Event):
entry = EventEntry.from_event(entry, transaction=transaction)
if not isinstance(entry.action, Action):
raise EventRepositoryException("The 'EventEntry.action' attribute must be an 'Action' instance.")
async with self.write_lock():
if not await self.validate(entry, **kwargs):
raise EventRepositoryConflictException(f"{entry!r} could not be committed!", await self.offset)
entry = await self._submit(entry, **kwargs)
if entry.transaction_uuid == NULL_UUID:
await self._send_events(entry.event)
finally:
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.reset(token)
return entry
# noinspection PyUnusedLocal
[docs] async def validate(self, entry: EventEntry, transaction_uuid_ne: Optional[UUID] = None, **kwargs) -> bool:
"""Check if it is able to submit the given entry.
:param entry: The entry to be validated.
:param transaction_uuid_ne: Optional transaction identifier to skip it from the validation.
:param kwargs: Additional named arguments.
:return: ``True`` if the entry can be submitted or ``False`` otherwise.
"""
iterable = self._transaction_repository.select(
destination_uuid=entry.transaction_uuid,
uuid_ne=transaction_uuid_ne,
status_in=(TransactionStatus.RESERVING, TransactionStatus.RESERVED, TransactionStatus.COMMITTING),
)
transaction_uuids = {e.uuid async for e in iterable}
if len(transaction_uuids):
with suppress(StopAsyncIteration):
iterable = self.select(uuid=entry.uuid, transaction_uuid_in=tuple(transaction_uuids), **kwargs)
await iterable.__anext__() # Will raise a `StopAsyncIteration` exception if not any item.
return False
return True
@abstractmethod
async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:
raise NotImplementedError
async def _send_events(self, event: Event):
suffix_mapper = {
Action.CREATE: "Created",
Action.UPDATE: "Updated",
Action.DELETE: "Deleted",
}
topic = f"{event.simplified_name}{suffix_mapper[event.action]}"
message = BrokerMessageV1(
topic=topic,
payload=BrokerMessageV1Payload(content=event),
strategy=BrokerMessageV1Strategy.MULTICAST,
)
futures = [self._broker_publisher.send(message)]
if event.action == Action.UPDATE:
for decomposed_event in event.decompose():
diff = next(iter(decomposed_event.fields_diff.flatten_values()))
composed_topic = f"{topic}.{diff.name}"
if isinstance(diff, IncrementalFieldDiff):
composed_topic += f".{diff.action.value}"
message = BrokerMessageV1(
topic=composed_topic,
payload=BrokerMessageV1Payload(content=decomposed_event),
strategy=BrokerMessageV1Strategy.MULTICAST,
)
futures.append(self._broker_publisher.send(message))
await gather(*futures)
# noinspection PyShadowingBuiltins
[docs] async def select(
self,
uuid: Optional[UUID] = None,
name: Optional[Union[str, type[RootEntity]]] = None,
version: Optional[int] = None,
version_lt: Optional[int] = None,
version_gt: Optional[int] = None,
version_le: Optional[int] = None,
version_ge: Optional[int] = None,
id: Optional[int] = None,
id_lt: Optional[int] = None,
id_gt: Optional[int] = None,
id_le: Optional[int] = None,
id_ge: Optional[int] = None,
transaction_uuid: Optional[UUID] = None,
transaction_uuid_ne: Optional[UUID] = None,
transaction_uuid_in: Optional[tuple[UUID, ...]] = None,
**kwargs,
) -> AsyncIterator[EventEntry]:
"""Perform a selection query of entries stored in to the repository.
:param uuid: The identifier must be equal to the given value.
:param name: The classname must be equal to the given value.
:param version: The version must be equal to the given value.
:param version_lt: The version must be lower than the given value.
:param version_gt: The version must be greater than the given value.
:param version_le: The version must be lower or equal to the given value.
:param version_ge: The version must be greater or equal to the given value.
:param id: The entry identifier must be equal to the given value.
:param id_lt: The entry identifier must be lower than the given value.
:param id_gt: The entry identifier must be greater than the given value.
:param id_le: The entry identifier must be lower or equal to the given value.
:param id_ge: The entry identifier must be greater or equal to the given value.
:param transaction_uuid: The transaction identifier must be equal to the given value.
:param transaction_uuid_ne: The transaction identifier must be distinct of the given value.
:param transaction_uuid_in: The destination transaction identifier must be equal to one of the given values.
:return: A list of entries.
"""
if isinstance(name, type):
name = classname(name)
generator = self._select(
uuid=uuid,
name=name,
version=version,
version_lt=version_lt,
version_gt=version_gt,
version_le=version_le,
version_ge=version_ge,
id=id,
id_lt=id_lt,
id_gt=id_gt,
id_le=id_le,
id_ge=id_ge,
transaction_uuid=transaction_uuid,
transaction_uuid_ne=transaction_uuid_ne,
transaction_uuid_in=transaction_uuid_in,
**kwargs,
)
# noinspection PyTypeChecker
async for entry in generator:
yield entry
@abstractmethod
async def _select(self, *args, **kwargs) -> AsyncIterator[EventEntry]:
"""Perform a selection query of entries stored in to the repository."""
@property
def offset(self) -> Awaitable[int]:
"""Get the current repository offset.
:return: An awaitable containing an integer value.
"""
return self._offset
@property
@abstractmethod
async def _offset(self) -> int:
raise NotImplementedError
[docs] def write_lock(self) -> Lock:
"""Get a write lock.
:return: An asynchronous context manager.
"""
return self._lock_pool.acquire("aggregate_event_write_lock")