Source code for minos.aggregate.transactions.services

import logging
from datetime import (
    timedelta,
)

from minos.common import (
    Config,
    Inject,
    current_datetime,
)
from minos.networks import (
    EnrouteDecorator,
    Request,
    ResponseException,
    enroute,
)

from ..exceptions import (
    EventRepositoryConflictException,
    TransactionNotFoundException,
    TransactionRepositoryConflictException,
)
from .entries import (
    TransactionStatus,
)
from .repositories import (
    TransactionRepository,
)

logger = logging.getLogger(__name__)


[docs]class TransactionService: """Snapshot Service class.""" # noinspection PyUnusedLocal
[docs] @Inject() def __init__(self, *args, transaction_repository: TransactionRepository, **kwargs): self.transaction_repository = transaction_repository
@classmethod def __get_enroute__(cls, config: Config) -> dict[str, set[EnrouteDecorator]]: name = config.get_name() return { cls.__reserve__.__name__: {enroute.broker.command(f"_Reserve{name.title()}Transaction")}, cls.__reject__.__name__: {enroute.broker.command(f"_Reject{name.title()}Transaction")}, cls.__commit__.__name__: {enroute.broker.command(f"_Commit{name.title()}Transaction")}, cls.__reject_blocked__.__name__: {enroute.periodic.event("* * * * *")}, } async def __reserve__(self, request: Request) -> None: uuid = await request.content() try: transaction = await self.transaction_repository.get(uuid) except TransactionNotFoundException: raise ResponseException(f"The transaction identified by {uuid!r} does not exist.") try: await transaction.reserve() except EventRepositoryConflictException: raise ResponseException("The transaction could not be reserved.") async def __reject__(self, request: Request) -> None: uuid = await request.content() try: transaction = await self.transaction_repository.get(uuid) except TransactionNotFoundException: raise ResponseException(f"The transaction identified by {uuid!r} does not exist.") if transaction.status == TransactionStatus.REJECTED: return try: await transaction.reject() except TransactionRepositoryConflictException as exc: raise ResponseException(f"{exc!s}") async def __commit__(self, request: Request) -> None: uuid = await request.content() try: transaction = await self.transaction_repository.get(uuid) except TransactionNotFoundException: raise ResponseException(f"The transaction identified by {uuid!r} does not exist.") try: await transaction.commit() except TransactionRepositoryConflictException as exc: raise ResponseException(f"{exc!s}") # noinspection PyUnusedLocal async def __reject_blocked__(self, request: Request) -> None: status_in = (TransactionStatus.RESERVED,) updated_at_lt = current_datetime() - timedelta(minutes=1) async for transaction in self.transaction_repository.select(status_in=status_in, updated_at_lt=updated_at_lt): logger.info(f"Rejecting {transaction.uuid!r} transaction as it has been reserved for a long time...") try: await transaction.reject() except TransactionRepositoryConflictException as exc: logger.warning(f"Raised an exception while trying to reject a blocked transaction: {exc!r}")