Source code for minos.saga.executions.steps.remote

from __future__ import (
    annotations,
)

from typing import (
    Optional,
)

from ...context import (
    SagaContext,
)
from ...definitions import (
    RemoteSagaStep,
)
from ...exceptions import (
    SagaFailedExecutionStepException,
    SagaPausedExecutionStepException,
    SagaResponseException,
    SagaRollbackExecutionStepException,
)
from ...messages import (
    SagaResponse,
    SagaResponseStatus,
)
from ...utils import (
    get_service_name,
)
from ..executors import (
    RequestExecutor,
    ResponseExecutor,
)
from ..status import (
    SagaStepStatus,
)
from .abc import (
    SagaStepExecution,
)


[docs]class RemoteSagaStepExecution(SagaStepExecution): """Saga Execution Step class.""" definition: RemoteSagaStep
[docs] async def execute( self, context: SagaContext, response: Optional[SagaResponse] = None, *args, **kwargs ) -> SagaContext: """Execution the remote step. :param context: The execution context to be used during the execution. :param response: An optional command response instance (to be consumed by the on_success method). :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: The updated context. """ await self._execute_on_execute(context, *args, **kwargs) if response is None: self.status = SagaStepStatus.PausedByOnExecute raise SagaPausedExecutionStepException() self.related_services |= response.related_services if response.status == SagaResponseStatus.SYSTEM_ERROR: self.status = SagaStepStatus.ErroredByOnExecute exc = SagaResponseException(f"Failed with {response.status!s} status: {await response.content()!s}") raise SagaFailedExecutionStepException(exc) if response.status == SagaResponseStatus.SUCCESS: context = await self._execute_on_success(context, response, *args, **kwargs) else: context = await self._execute_on_error(context, response, *args, **kwargs) self.status = SagaStepStatus.Finished return context
async def _execute_on_execute(self, context: SagaContext, *args, **kwargs) -> None: if self.status != SagaStepStatus.Created: return self.status = SagaStepStatus.RunningOnExecute executor = RequestExecutor(*args, **kwargs) self.related_services.add(get_service_name()) try: await executor.exec(self.definition.on_execute_operation, context) except SagaFailedExecutionStepException as exc: self.status = SagaStepStatus.ErroredOnExecute raise exc self.status = SagaStepStatus.FinishedOnExecute async def _execute_on_success(self, context: SagaContext, response: SagaResponse, *args, **kwargs) -> SagaContext: self.status = SagaStepStatus.RunningOnSuccess executor = ResponseExecutor(*args, **kwargs) self.related_services.add(get_service_name()) try: context = await executor.exec(self.definition.on_success_operation, context, response) except SagaFailedExecutionStepException as exc: self.status = SagaStepStatus.ErroredOnSuccess await self.rollback(context, *args, **kwargs) raise exc return context async def _execute_on_error(self, context: SagaContext, response: SagaResponse, *args, **kwargs) -> SagaContext: self.status = SagaStepStatus.RunningOnError executor = ResponseExecutor(*args, **kwargs) self.related_services.add(get_service_name()) try: context = await executor.exec(self.definition.on_error_operation, context, response) except SagaFailedExecutionStepException as exc: self.status = SagaStepStatus.ErroredOnError await self.rollback(context, *args, **kwargs) raise exc return context
[docs] async def rollback(self, context: SagaContext, *args, **kwargs) -> SagaContext: """Revert the executed remote step. :param context: Execution context. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: The updated execution context. """ if self.status == SagaStepStatus.Created: raise SagaRollbackExecutionStepException("There is nothing to rollback.") if self.already_rollback: raise SagaRollbackExecutionStepException("The step was already rollbacked.") executor = RequestExecutor(*args, **kwargs) await executor.exec(self.definition.on_failure_operation, context) self.already_rollback = True return context