Source code for minos.aggregate.transactions.entries

from __future__ import (
    annotations,
)

import logging
from contextlib import (
    suppress,
)
from contextvars import (
    Token,
)
from datetime import (
    datetime,
)
from enum import (
    Enum,
)
from typing import (
    TYPE_CHECKING,
    Any,
    Iterable,
    Optional,
    Union,
)
from uuid import (
    UUID,
    uuid4,
)

from minos.common import (
    NULL_UUID,
    Inject,
    NotProvidedException,
)

from ..exceptions import (
    EventRepositoryConflictException,
)
from .contextvars import (
    TRANSACTION_CONTEXT_VAR,
)

if TYPE_CHECKING:
    from ..events import (
        EventRepository,
    )
    from .repositories import (
        TransactionRepository,
    )

logger = logging.getLogger(__name__)


[docs]class TransactionEntry: """Transaction Entry class.""" __slots__ = ( "uuid", "status", "event_offset", "destination_uuid", "updated_at", "_autocommit", "_event_repository", "_transaction_repository", "_token", )
[docs] def __init__( self, uuid: Optional[UUID] = None, status: Union[str, TransactionStatus] = None, event_offset: Optional[int] = None, destination_uuid: Optional[UUID] = None, updated_at: Optional[datetime] = None, autocommit: bool = True, event_repository: Optional[EventRepository] = None, transaction_repository: Optional[TransactionRepository] = None, ): if event_repository is None: from ..events import ( EventRepository, ) with suppress(NotProvidedException): event_repository = Inject.resolve(EventRepository) if transaction_repository is None: from .repositories import ( TransactionRepository, ) with suppress(NotProvidedException): transaction_repository = Inject.resolve(TransactionRepository) if uuid is None: uuid = uuid4() if status is None: status = TransactionStatus.PENDING if not isinstance(status, TransactionStatus): status = TransactionStatus.value_of(status) if destination_uuid is None: outer = TRANSACTION_CONTEXT_VAR.get() outer_uuid = getattr(outer, "uuid", NULL_UUID) destination_uuid = outer_uuid self.uuid = uuid self.status = status self.event_offset = event_offset self.destination_uuid = destination_uuid self.updated_at = updated_at self._autocommit = autocommit self._event_repository = event_repository self._transaction_repository = transaction_repository self._token = None
async def __aenter__(self): if self.status != TransactionStatus.PENDING: raise ValueError(f"Current status is not {TransactionStatus.PENDING!r}. Obtained: {self.status!r}") outer = TRANSACTION_CONTEXT_VAR.get() outer_uuid = getattr(outer, "uuid", NULL_UUID) if outer_uuid != self.destination_uuid: raise ValueError(f"{self!r} requires to be run on top of {outer!r}") self._token = TRANSACTION_CONTEXT_VAR.set(self) await self.save() return self async def __aexit__(self, exc_type, exc_val, exc_tb): TRANSACTION_CONTEXT_VAR.reset(self._token) if self._autocommit and self.status == TransactionStatus.PENDING: await self.commit()
[docs] async def commit(self) -> None: """Commit transaction changes. :return: This method does not return anything. """ if self.status == TransactionStatus.PENDING: await self.reserve() if self.status != TransactionStatus.RESERVED: raise ValueError(f"Current status is not {TransactionStatus.RESERVED!r}. Obtained: {self.status!r}") async with self._transaction_repository.write_lock(): await self.save(status=TransactionStatus.COMMITTING) await self._commit() event_offset = 1 + await self._event_repository.offset await self.save(event_offset=event_offset, status=TransactionStatus.COMMITTED)
async def _commit(self) -> None: from ..events import ( EventEntry, ) async for entry in self._event_repository.select(transaction_uuid=self.uuid): new = EventEntry.from_another(entry, transaction_uuid=self.destination_uuid) await self._event_repository.submit(new, transaction_uuid_ne=self.uuid)
[docs] async def reserve(self) -> None: """Reserve transaction changes to be ensured that they can be applied. :return: This method does not return anything. """ if self.status != TransactionStatus.PENDING: raise ValueError(f"Current status is not {TransactionStatus.PENDING!r}. Obtained: {self.status!r}") async with self._transaction_repository.write_lock(): async with self._event_repository.write_lock(): await self.save(status=TransactionStatus.RESERVING) committable = await self.validate() status = TransactionStatus.RESERVED if committable else TransactionStatus.REJECTED event_offset = 1 + await self._event_repository.offset await self.save(event_offset=event_offset, status=status) if not committable: raise EventRepositoryConflictException(f"{self!r} could not be reserved!", event_offset)
[docs] async def validate(self) -> bool: """Check if the transaction is committable. :return: ``True`` if the transaction is valid or ``False`` otherwise. """ with suppress(StopAsyncIteration): iterable = self._transaction_repository.select( uuid=self.destination_uuid, status_in=( TransactionStatus.RESERVING, TransactionStatus.RESERVED, TransactionStatus.COMMITTING, TransactionStatus.COMMITTED, TransactionStatus.REJECTED, ), ) await iterable.__anext__() # Will raise a `StopAsyncIteration` exception if not any item. return False entries = dict() async for entry in self._event_repository.select(transaction_uuid=self.uuid): if entry.uuid in entries and entry.version < entries[entry.uuid]: continue entries[entry.uuid] = entry.version transaction_uuids = set() for uuid, version in entries.items(): async for entry in self._event_repository.select(uuid=uuid, version=version): if entry.transaction_uuid == self.destination_uuid: return False if entry.transaction_uuid != self.uuid: transaction_uuids.add(entry.transaction_uuid) if len(transaction_uuids): with suppress(StopAsyncIteration): iterable = self._transaction_repository.select( destination_uuid=self.destination_uuid, uuid_in=tuple(transaction_uuids), status_in=( TransactionStatus.RESERVING, TransactionStatus.RESERVED, TransactionStatus.COMMITTING, TransactionStatus.COMMITTED, ), ) await iterable.__anext__() # Will raise a `StopAsyncIteration` exception if not any item. return False return True
[docs] async def reject(self) -> None: """Reject transaction changes. :return: This method does not return anything. """ if self.status not in (TransactionStatus.PENDING, TransactionStatus.RESERVED): raise ValueError( f"Current status is not in {(TransactionStatus.PENDING, TransactionStatus.RESERVED)!r}. " f"Obtained: {self.status!r}" ) async with self._transaction_repository.write_lock(): event_offset = 1 + await self._event_repository.offset await self.save(event_offset=event_offset, status=TransactionStatus.REJECTED)
[docs] async def save(self, *, event_offset: Optional[int] = None, status: Optional[TransactionStatus] = None) -> None: """Saves the transaction into the repository. :param event_offset: The event offset. :param status: The status. :return: This method does not return anything. """ if event_offset is not None: self.event_offset = event_offset if status is not None: self.status = status await self._transaction_repository.submit(self)
@property async def uuids(self) -> tuple[UUID, ...]: """Get the sequence of transaction identifiers, from the outer one (``NULL_UUID``) to the one related with self. :return: A tuple of ``UUID`` values. """ uuids = [] current = self while current is not None: uuids.append(current.uuid) current = await current.destination # noinspection PyRedundantParentheses return (NULL_UUID, *uuids[::-1]) @property async def destination(self) -> Optional[TransactionEntry]: """Get the destination transaction if there is anyone, otherwise ``None`` is returned. :return: A ``TransactionEntry`` or ``None``. """ if self.destination_uuid == NULL_UUID: return None destination = getattr(self._token, "old_value", Token.MISSING) if destination == Token.MISSING: destination = await self._transaction_repository.get(uuid=self.destination_uuid) return destination def __eq__(self, other: TransactionEntry) -> bool: return isinstance(other, type(self)) and tuple(self) == tuple(other) def __iter__(self) -> Iterable: # noinspection PyRedundantParentheses yield from ( self.uuid, self.status, self.event_offset, self.destination_uuid, ) def __repr__(self): return ( f"{type(self).__name__}(uuid={self.uuid!r}, status={self.status!r}, event_offset={self.event_offset!r}, " f"destination_uuid={self.destination_uuid!r}, updated_at={self.updated_at!r})" )
[docs] def as_raw(self) -> dict[str, Any]: """Get a raw representation of the instance. :return: A dictionary in which the keys are attribute names and values the attribute contents. """ return { "uuid": self.uuid, "status": self.status, "event_offset": self.event_offset, "destination_uuid": self.destination_uuid, "updated_at": self.updated_at, }
[docs]class TransactionStatus(str, Enum): """Transaction Status Enum.""" PENDING = "pending" RESERVING = "reserving" RESERVED = "reserved" COMMITTING = "committing" COMMITTED = "committed" REJECTED = "rejected"
[docs] @classmethod def value_of(cls, value: str) -> TransactionStatus: """Get the status based on its text representation.""" for item in cls.__members__.values(): if item.value == value: return item raise ValueError(f"The given value does not match with any enum items. Obtained {value}")