from datetime import (
datetime,
)
from typing import (
AsyncIterator,
Optional,
)
from uuid import (
UUID,
)
from minos.common import (
current_datetime,
)
from ...exceptions import (
TransactionRepositoryConflictException,
)
from ..entries import (
TransactionEntry,
)
from ..entries import TransactionStatus as s
from .abc import (
TransactionRepository,
)
[docs]class InMemoryTransactionRepository(TransactionRepository):
"""In Memory Transaction Repository class."""
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._storage = dict()
async def _submit(self, transaction: TransactionEntry) -> TransactionEntry:
transaction.updated_at = current_datetime()
if transaction.uuid in self._storage:
status = self._storage[transaction.uuid].status
if (
(status == s.PENDING and transaction.status not in (s.PENDING, s.RESERVING, s.REJECTED))
or (status == s.RESERVING and transaction.status not in (s.RESERVED, s.REJECTED))
or (status == s.RESERVED and transaction.status not in (s.COMMITTING, s.REJECTED))
or (status == s.COMMITTING and transaction.status not in (s.COMMITTED,))
or (status == s.COMMITTED)
or (status == s.REJECTED)
):
raise TransactionRepositoryConflictException(
f"{transaction!r} status is invalid respect to the previous one."
)
self._storage[transaction.uuid] = TransactionEntry(
uuid=transaction.uuid,
destination_uuid=transaction.destination_uuid,
status=transaction.status,
event_offset=transaction.event_offset,
updated_at=transaction.updated_at,
event_repository=transaction._event_repository,
transaction_repository=transaction._transaction_repository,
)
return transaction
async def _select(
self,
uuid: Optional[UUID] = None,
uuid_ne: Optional[UUID] = None,
uuid_in: Optional[tuple[UUID, ...]] = None,
destination_uuid: Optional[UUID] = None,
status: Optional[s] = None,
status_in: Optional[tuple[str, ...]] = None,
event_offset: Optional[int] = None,
event_offset_lt: Optional[int] = None,
event_offset_gt: Optional[int] = None,
event_offset_le: Optional[int] = None,
event_offset_ge: Optional[int] = None,
updated_at: Optional[datetime] = None,
updated_at_lt: Optional[datetime] = None,
updated_at_gt: Optional[datetime] = None,
updated_at_le: Optional[datetime] = None,
updated_at_ge: Optional[datetime] = None,
**kwargs,
) -> AsyncIterator[TransactionEntry]:
# noinspection DuplicatedCode
def _fn_filter(transaction: TransactionEntry) -> bool:
if uuid is not None and uuid != transaction.uuid:
return False
if uuid_ne is not None and uuid_ne == transaction.uuid:
return False
if uuid_in is not None and transaction.uuid not in uuid_in:
return False
if destination_uuid is not None and destination_uuid != transaction.destination_uuid:
return False
if status is not None and status != transaction.status:
return False
if status_in is not None and transaction.status not in status_in:
return False
if event_offset is not None and event_offset != transaction.event_offset:
return False
if event_offset_lt is not None and event_offset_lt <= transaction.event_offset:
return False
if event_offset_gt is not None and event_offset_gt >= transaction.event_offset:
return False
if event_offset_le is not None and event_offset_le < transaction.event_offset:
return False
if event_offset_ge is not None and event_offset_ge > transaction.event_offset:
return False
if updated_at is not None and updated_at != transaction.updated_at:
return False
if updated_at_lt is not None and updated_at_lt <= transaction.updated_at:
return False
if updated_at_gt is not None and updated_at_gt >= transaction.updated_at:
return False
if updated_at_le is not None and updated_at_le < transaction.updated_at:
return False
if updated_at_ge is not None and updated_at_ge > transaction.updated_at:
return False
return True
iterable = iter(self._storage.values())
iterable = filter(_fn_filter, iterable)
for item in iterable:
yield item