Source code for minos.saga.manager

from __future__ import (
    annotations,
)

import logging
import warnings
from functools import (
    reduce,
)
from operator import (
    or_,
)
from typing import (
    Optional,
    Union,
)
from uuid import (
    UUID,
)

from minos.common import (
    Config,
    Inject,
    Injectable,
    NotProvidedException,
    PoolFactory,
    SetupMixin,
)
from minos.networks import (
    REQUEST_HEADERS_CONTEXT_VAR,
    REQUEST_USER_CONTEXT_VAR,
    BrokerClient,
    BrokerClientPool,
    BrokerMessage,
)

from .context import (
    SagaContext,
)
from .definitions import (
    Saga,
)
from .exceptions import (
    SagaFailedExecutionException,
    SagaPausedExecutionStepException,
)
from .executions import (
    DatabaseSagaExecutionRepository,
    SagaExecution,
    SagaExecutionRepository,
    SagaStatus,
)
from .messages import (
    SagaResponse,
)

logger = logging.getLogger(__name__)


[docs]@Injectable("saga_manager") class SagaManager(SetupMixin): """Saga Manager implementation class. The purpose of this class is to manage the running process for new or paused``SagaExecution`` instances. """
[docs] @Inject() def __init__( self, storage: SagaExecutionRepository, broker_pool: Optional[BrokerClientPool] = None, pool_factory: Optional[PoolFactory] = None, *args, **kwargs, ): super().__init__(*args, **kwargs) self.storage = storage if broker_pool is None and pool_factory is not None: broker_pool = pool_factory.get_pool("broker") if broker_pool is None: raise NotProvidedException(f"A {BrokerClientPool!r} instance is required.") self.broker_pool = broker_pool
@classmethod def _from_config(cls, config: Config, **kwargs) -> SagaManager: """Build an instance from config. :param args: Additional positional arguments. :param config: Config instance. :param kwargs: Additional named arguments. :return: A new ``SagaManager`` instance. """ storage = DatabaseSagaExecutionRepository.from_config(config, **kwargs) return cls(storage=storage, **kwargs) async def _setup(self) -> None: await super()._setup() await self.storage.setup() async def _destroy(self) -> None: await self.storage.destroy() await super()._destroy()
[docs] async def run( self, definition: Optional[Saga] = None, context: Optional[SagaContext] = None, *, response: Optional[SagaResponse] = None, user: Optional[UUID] = None, autocommit: bool = True, pause_on_disk: bool = False, raise_on_error: bool = True, return_execution: bool = True, **kwargs, ) -> Union[UUID, SagaExecution]: """Perform a run of a ``Saga``. The run can be a new one (if a name is provided) or continue execution a previous one (if a reply is provided). :param definition: Saga definition to be executed. :param context: Initial context to be used during the execution. (Only used for new executions) :param response: The reply that relaunches a saga execution. :param user: The user identifier to be injected on remote steps. :param autocommit: If ``True`` the transactions are committed/rejected automatically. Otherwise, the ``commit`` or ``reject`` must to be called manually. :param pause_on_disk: If ``True`` the pauses until remote steps' responses are paused on disk (background, non-blocking the execution). Otherwise, the pauses are waited on memory (online, blocking the execution) :param raise_on_error: If ``True`` exceptions are raised on error. Otherwise, the execution is returned normally but with ``Errored`` status. :param return_execution: If ``True`` the ``SagaExecution`` instance is returned. Otherwise, only the identifier (``UUID``) is returned. :param kwargs: Additional named arguments. :return: This method does not return anything. """ if response is not None: return await self._load_and_run( response=response, autocommit=autocommit, pause_on_disk=pause_on_disk, raise_on_error=raise_on_error, return_execution=return_execution, **kwargs, ) return await self._run_new( definition=definition, context=context, user=user, autocommit=autocommit, pause_on_disk=pause_on_disk, raise_on_error=raise_on_error, return_execution=return_execution, **kwargs, )
async def _run_new( self, definition: Saga, context: Optional[SagaContext] = None, user: Optional[UUID] = None, **kwargs ) -> Union[UUID, SagaExecution]: if REQUEST_USER_CONTEXT_VAR.get() is not None: if user is not None: warnings.warn("The `user` Argument will be ignored in favor of the `user` ContextVar", RuntimeWarning) user = REQUEST_USER_CONTEXT_VAR.get() execution = SagaExecution.from_definition(definition, context=context, user=user) return await self._run(execution, **kwargs) async def _load_and_run(self, response: SagaResponse, **kwargs) -> Union[UUID, SagaExecution]: execution = await self.storage.load(response.uuid) return await self._run(execution, response=response, **kwargs) async def _run( self, execution: SagaExecution, pause_on_disk: bool = False, raise_on_error: bool = True, return_execution: bool = True, **kwargs, ) -> Union[UUID, SagaExecution]: try: if pause_on_disk: await self._run_with_pause_on_disk(execution, **kwargs) else: await self._run_with_pause_on_memory(execution, **kwargs) except SagaFailedExecutionException as exc: await self.storage.store(execution) if raise_on_error: raise exc logger.exception(f"The execution identified by {execution.uuid!s} failed") finally: if (headers := REQUEST_HEADERS_CONTEXT_VAR.get()) is not None: related_services = reduce(or_, (s.related_services for s in execution.executed_steps), set()) if execution.paused_step is not None: related_services.update(execution.paused_step.related_services) if raw_related_services := headers.get("related_services"): related_services.update(raw_related_services.split(",")) headers["related_services"] = ",".join(related_services) if execution.status == SagaStatus.Finished: await self.storage.delete(execution) if return_execution: return execution return execution.uuid async def _run_with_pause_on_disk(self, execution: SagaExecution, autocommit: bool = True, **kwargs) -> None: try: await execution.execute(autocommit=False, **kwargs) if autocommit: await execution.commit(**kwargs) except SagaPausedExecutionStepException: await self.storage.store(execution) except SagaFailedExecutionException as exc: if autocommit: await execution.reject(**kwargs) raise exc async def _run_with_pause_on_memory( self, execution: SagaExecution, response: Optional[SagaResponse] = None, autocommit: bool = True, **kwargs ) -> None: try: # noinspection PyUnresolvedReferences async with self.broker_pool.acquire() as broker: while execution.status in (SagaStatus.Created, SagaStatus.Paused): try: await execution.execute(response=response, autocommit=False, **kwargs) except SagaPausedExecutionStepException: response = await self._get_response(broker, execution, **kwargs) await self.storage.store(execution) if autocommit: await execution.commit(**kwargs) except SagaFailedExecutionException as exc: if autocommit: await execution.reject(**kwargs) raise exc @staticmethod async def _get_response(broker: BrokerClient, execution: SagaExecution, **kwargs) -> SagaResponse: message: Optional[BrokerMessage] = None while message is None or "saga" not in message.headers or UUID(message.headers["saga"]) != execution.uuid: try: message = await broker.receive(**kwargs) except Exception as exc: execution.status = SagaStatus.Errored raise SagaFailedExecutionException(exc) return SagaResponse.from_message(message)