Source code for minos.saga.definitions.saga

from __future__ import (
    annotations,
)

import warnings
from typing import (
    Any,
    Iterable,
    Optional,
    Type,
    TypeVar,
    Union,
)

from ..exceptions import (
    AlreadyCommittedException,
    AlreadyOnSagaException,
    EmptySagaException,
    SagaNotCommittedException,
)
from .operations import (
    SagaOperation,
)
from .steps import (
    ConditionalSagaStep,
    LocalSagaStep,
    RemoteSagaStep,
    SagaStep,
)
from .types import (
    LocalCallback,
    RequestCallBack,
)


[docs]class Saga: """Saga class. The purpose of this class is to define a sequence of operations among microservices. """ # noinspection PyUnusedLocal
[docs] def __init__(self, *args, steps: list[SagaStep] = None, committed: bool = False, commit: None = None, **kwargs): if steps is None: steps = list() self.steps = steps self.committed = committed if commit is not None: warnings.warn(f"Commit callback is being deprecated. Use {self.local_step!r} instead", DeprecationWarning) self.local_step(commit) self.committed = True
[docs] @classmethod def from_raw(cls, raw: Union[dict[str, Any], Saga], **kwargs) -> Saga: """Build a new ``Saga`` instance from raw. :param raw: The raw representation of the saga. :param kwargs: Additional named arguments. :return: A new ``Saga`` instance. """ if isinstance(raw, cls): return raw current = raw | kwargs steps = [SagaStep.from_raw(step) for step in current.pop("steps")] instance = cls(steps=steps, **current) return instance
[docs] def conditional_step(self, step: Optional[ConditionalSagaStep] = None) -> ConditionalSagaStep: """Add a new conditional step. :param step: The step to be added. If `None` is provided then a new one will be created. :return: A ``SagaStep`` instance. """ return self._add_step(ConditionalSagaStep, step)
[docs] def local_step( self, step: Optional[Union[LocalCallback, SagaOperation[LocalCallback], LocalSagaStep]] = None, **kwargs ) -> LocalSagaStep: """Add a new local step. :param step: The step to be added. If `None` is provided then a new one will be created. :param kwargs: Additional named parameters. :return: A ``SagaStep`` instance. """ if step is not None and not isinstance(step, SagaStep) and not isinstance(step, SagaOperation): step = SagaOperation(step, **kwargs) return self._add_step(LocalSagaStep, step)
[docs] def step( self, step: Optional[Union[RequestCallBack, SagaOperation[RequestCallBack], RemoteSagaStep]] = None, **kwargs ) -> RemoteSagaStep: """Add a new remote step step. :param step: The step to be added. If `None` is provided then a new one will be created. :param kwargs: Additional named parameters. :return: A ``SagaStep`` instance. """ warnings.warn("step() method is deprecated by remote_step() and will be removed soon.", DeprecationWarning) return self.remote_step(step, **kwargs)
[docs] def remote_step( self, step: Optional[Union[RequestCallBack, SagaOperation[RequestCallBack], RemoteSagaStep]] = None, **kwargs ) -> RemoteSagaStep: """Add a new remote step step. :param step: The step to be added. If `None` is provided then a new one will be created. :param kwargs: Additional named parameters. :return: A ``SagaStep`` instance. """ if step is not None and not isinstance(step, SagaStep) and not isinstance(step, SagaOperation): step = SagaOperation(step, **kwargs) return self._add_step(RemoteSagaStep, step)
def _add_step(self, step_cls: Type[T], step: Optional[Union[SagaOperation, T]]) -> T: if self.committed: raise AlreadyCommittedException("It is not possible to add more steps to an already committed saga.") if step is None: step = step_cls(saga=self) elif isinstance(step, SagaStep): if not isinstance(step, step_cls): raise TypeError(f"The given step does not match the requested type: {step_cls} vs {type(step)}") if step.saga is not None: raise AlreadyOnSagaException() step.saga = self else: step = step_cls(step, saga=self) self.steps.append(step) return step @property def raw(self) -> dict[str, Any]: """Generate a raw representation of the instance. :return: A ``dict`` instance. """ ans = { "steps": [step.raw for step in self.steps], "committed": self.committed, } return ans def __eq__(self, other: SagaStep) -> bool: return type(self) == type(other) and tuple(self) == tuple(other) def __iter__(self) -> Iterable: yield from ( self.steps, self.committed, )
[docs] def commit(self, callback: None = None, **kwargs) -> Saga: """Commit the instance to be ready for execution. :param callback: Deprecated argument. :param kwargs: Additional named arguments. :return: A ``Saga`` instance. """ if self.committed: raise AlreadyCommittedException("It is not possible to commit a saga multiple times.") if callback is not None: warnings.warn(f"Commit callback is being deprecated. Use {self.local_step!r} instead", DeprecationWarning) self.local_step(callback, **kwargs) self.committed = True return self
[docs] def validate(self) -> None: """Check if the saga is valid. :return: This method does not return anything, but raises an exception if the saga is not valid. """ if not len(self.steps): raise EmptySagaException() for step in self.steps: step.validate() if not self.committed: raise SagaNotCommittedException()
T = TypeVar("T")