Source code for minos.aggregate.entities.models

from __future__ import (
    annotations,
)

import logging
from datetime import (
    datetime,
)
from typing import (
    AsyncIterator,
    Optional,
    Type,
    TypeVar,
)
from uuid import (
    UUID,
    uuid4,
)

from minos.common import (
    NULL_DATETIME,
    NULL_UUID,
    DeclarativeModel,
    Inject,
    NotProvidedException,
)

from ..events import (
    Event,
    EventEntry,
    EventRepository,
    IncrementalFieldDiff,
)
from ..exceptions import (
    EventRepositoryException,
)
from ..queries import (
    _Condition,
    _Ordering,
)
from ..snapshots import (
    SnapshotRepository,
)

logger = logging.getLogger(__name__)


[docs]class Entity(DeclarativeModel): """Entity class.""" uuid: UUID
[docs] def __init__(self, *args, uuid: Optional[UUID] = None, **kwargs): if uuid is None: uuid = uuid4() super().__init__(uuid, *args, **kwargs)
[docs]class ExternalEntity(Entity): """External Entity class.""" version: int
[docs] def __init__(self, uuid: UUID, *args, **kwargs): super().__init__(uuid=uuid, *args, **kwargs)
T = TypeVar("T", bound="RootEntity")
[docs]class RootEntity(Entity): """Base Root Entity class.""" version: int created_at: datetime updated_at: datetime _event_repository: EventRepository _snapshot_repository: SnapshotRepository
[docs] @Inject() def __init__( self, *args, uuid: UUID = NULL_UUID, version: int = 0, created_at: datetime = NULL_DATETIME, updated_at: datetime = NULL_DATETIME, _event_repository: EventRepository, _snapshot_repository: SnapshotRepository, **kwargs, ): super().__init__(version, created_at, updated_at, *args, uuid=uuid, **kwargs) if _event_repository is None: raise NotProvidedException(f"A {EventRepository!r} instance is required.") if _snapshot_repository is None: raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.") self._event_repository = _event_repository self._snapshot_repository = _snapshot_repository
[docs] @classmethod @Inject() async def get(cls: Type[T], uuid: UUID, *, _snapshot_repository: SnapshotRepository, **kwargs) -> T: """Get one instance from the database based on its identifier. :param uuid: The identifier of the instance. :param _snapshot_repository: Snapshot to be set to the root entity. :return: A ``RootEntity`` instance. """ if _snapshot_repository is None: raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.") # noinspection PyTypeChecker return await _snapshot_repository.get(cls.classname, uuid, _snapshot_repository=_snapshot_repository, **kwargs)
[docs] @classmethod @Inject() def get_all( cls: Type[T], ordering: Optional[_Ordering] = None, limit: Optional[int] = None, *, _snapshot_repository: SnapshotRepository, **kwargs, ) -> AsyncIterator[T]: """Get all instance from the database. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param _snapshot_repository: Snapshot to be set to the root entity. :return: A ``RootEntity`` instance. """ if _snapshot_repository is None: raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.") # noinspection PyTypeChecker return _snapshot_repository.get_all( cls.classname, ordering, limit, _snapshot_repository=_snapshot_repository, **kwargs )
[docs] @classmethod @Inject() def find( cls: Type[T], condition: _Condition, ordering: Optional[_Ordering] = None, limit: Optional[int] = None, *, _snapshot_repository: SnapshotRepository, **kwargs, ) -> AsyncIterator[T]: """Find a collection of instances based on a given ``Condition``. :param condition: The ``Condition`` that must be satisfied by all the instances. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param _snapshot_repository: Snapshot to be set to the instances. :return: An asynchronous iterator of ``RootEntity`` instances. """ if _snapshot_repository is None: raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.") # noinspection PyTypeChecker return _snapshot_repository.find( cls.classname, condition, ordering, limit, _snapshot_repository=_snapshot_repository, **kwargs )
[docs] @classmethod async def create(cls: Type[T], *args, **kwargs) -> T: """Create a new ``RootEntity`` instance. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A new ``RootEntity`` instance. """ if "uuid" in kwargs: raise EventRepositoryException( f"The identifier must be computed internally on the repository. Obtained: {kwargs['uuid']}" ) if "version" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['version']}" ) if "created_at" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['created_at']}" ) if "updated_at" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['updated_at']}" ) instance: T = cls(*args, **kwargs) event = Event.from_root_entity(instance) entry = await instance._event_repository.submit(event) instance._update_from_repository_entry(entry) return instance
# noinspection PyMethodParameters,PyShadowingBuiltins
[docs] async def update(self: T, **kwargs) -> T: """Update an existing ``RootEntity`` instance. :param kwargs: Additional named arguments. :return: An updated ``RootEntity`` instance. """ if "version" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['version']}" ) if "created_at" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['created_at']}" ) if "updated_at" in kwargs: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {kwargs['updated_at']}" ) for key, value in kwargs.items(): setattr(self, key, value) previous = await self.get( self.uuid, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository ) event = self.diff(previous) if not len(event.fields_diff): return self entry = await self._event_repository.submit(event) self._update_from_repository_entry(entry) return self
[docs] async def save(self) -> None: """Store the current instance on the repository. If didn't exist previously creates a new one, otherwise updates the existing one. """ is_creation = self.uuid == NULL_UUID if is_creation != (self.version == 0): if is_creation: raise EventRepositoryException( f"The version must be computed internally on the repository. Obtained: {self.version}" ) else: raise EventRepositoryException( f"The uuid must be computed internally on the repository. Obtained: {self.uuid}" ) values = { k: field.value for k, field in self.fields.items() if k not in {"uuid", "version", "created_at", "updated_at"} } if is_creation: new = await self.create( **values, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository ) self._fields |= new.fields else: await self.update( **values, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository )
[docs] async def refresh(self) -> None: """Refresh the state of the given instance. :return: This method does not return anything. """ new = await self.get( self.uuid, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository ) self._fields |= new.fields
[docs] async def delete(self) -> None: """Delete the given root entity instance. :return: This method does not return anything. """ event = Event.from_deleted_root_entity(self) entry = await self._event_repository.submit(event) self._update_from_repository_entry(entry)
def _update_from_repository_entry(self, entry: EventEntry) -> None: self.uuid = entry.uuid self.version = entry.version if entry.action.is_create: self.created_at = entry.created_at self.updated_at = entry.created_at
[docs] def diff(self, another: RootEntity) -> Event: """Compute the difference with another instance. Both ``RootEntity`` instances (``self`` and ``another``) must share the same ``uuid`` value. :param another: Another ``RootEntity`` instance. :return: An ``FieldDiffContainer`` instance. """ return Event.from_difference(self, another)
[docs] def apply_diff(self, event: Event) -> None: """Apply the differences over the instance. :param event: The ``FieldDiffContainer`` containing the values to be set. :return: This method does not return anything. """ if self.uuid != event.uuid: raise ValueError( f"To apply the difference, it must have same uuid. " f"Expected: {self.uuid!r} Obtained: {event.uuid!r}" ) logger.debug(f"Applying {event!r} to {self!r}...") for diff in event.fields_diff.flatten_values(): if isinstance(diff, IncrementalFieldDiff): container = getattr(self, diff.name) if diff.action.is_delete: container.discard(diff.value) else: container.add(diff.value) else: setattr(self, diff.name, diff.value) self.version = event.version self.updated_at = event.created_at
[docs] @classmethod def from_diff(cls: Type[T], event: Event, *args, **kwargs) -> T: """Build a new instance from an ``Event``. :param event: The difference that contains the data. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A new ``RootEntity`` instance. """ return cls( *args, uuid=event.uuid, version=event.version, created_at=event.created_at, updated_at=event.created_at, **event.get_fields(), **kwargs, )