Source code for minos.aggregate.events.repositories.memory

from __future__ import (
    annotations,
)

from collections import (
    defaultdict,
)
from itertools import (
    count,
)
from typing import (
    AsyncIterator,
    Optional,
)
from uuid import (
    UUID,
    uuid4,
)

from minos.common import (
    NULL_UUID,
    current_datetime,
)

from ...exceptions import (
    EventRepositoryConflictException,
)
from ..entries import (
    EventEntry,
)
from .abc import (
    EventRepository,
)


[docs]class InMemoryEventRepository(EventRepository): """Memory-based implementation of the event repository class in ``minos``."""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._storage = list() self._id_generator = count() self._next_versions = defaultdict(int)
async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry: if entry.uuid == NULL_UUID: entry.uuid = uuid4() next_version = self._get_next_version_id(entry) if entry.version is None: entry.version = next_version if entry.version < next_version: raise EventRepositoryConflictException( f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision", await self.offset, ) if entry.created_at is None: entry.created_at = current_datetime() entry.id = self._generate_next_id() self._storage.append(entry) return entry def _generate_next_id(self) -> int: return next(self._id_generator) + 1 def _get_next_version_id(self, entry: EventEntry) -> int: key = (entry.name, entry.uuid, entry.transaction_uuid) self._next_versions[key] += 1 return self._next_versions[key] async def _select( self, uuid: Optional[int] = None, name: Optional[str] = None, version: Optional[int] = None, version_lt: Optional[int] = None, version_gt: Optional[int] = None, version_le: Optional[int] = None, version_ge: Optional[int] = None, id: Optional[int] = None, id_lt: Optional[int] = None, id_gt: Optional[int] = None, id_le: Optional[int] = None, id_ge: Optional[int] = None, transaction_uuid: Optional[UUID] = None, transaction_uuid_ne: Optional[UUID] = None, transaction_uuid_in: Optional[tuple[UUID, ...]] = None, *args, **kwargs, ) -> AsyncIterator[EventEntry]: # noinspection DuplicatedCode def _fn_filter(entry: EventEntry) -> bool: if uuid is not None and uuid != entry.uuid: return False if name is not None and name != entry.name: return False if version is not None and version != entry.version: return False if version_lt is not None and version_lt <= entry.version: return False if version_gt is not None and version_gt >= entry.version: return False if version_le is not None and version_le < entry.version: return False if version_ge is not None and version_ge > entry.version: return False if id is not None and id != entry.id: return False if id_lt is not None and id_lt <= entry.id: return False if id_gt is not None and id_gt >= entry.id: return False if id_le is not None and id_le < entry.id: return False if id_ge is not None and id_ge > entry.id: return False if transaction_uuid is not None and transaction_uuid != entry.transaction_uuid: return False if transaction_uuid_ne is not None and transaction_uuid_ne == entry.transaction_uuid: return False if transaction_uuid_in is not None and entry.transaction_uuid not in transaction_uuid_in: return False return True iterable = iter(self._storage) iterable = filter(_fn_filter, iterable) for item in iterable: yield item @property async def _offset(self) -> int: return len(self._storage)