Source code for minos.saga.exceptions

from minos.common import (
    MinosException,
)


[docs]class SagaException(MinosException): """Base saga exception."""
[docs]class EmptySagaException(SagaException): """Exception to be raised when saga is empty."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'Saga' must have at least one step." super().__init__(message)
[docs]class SagaNotCommittedException(SagaException): """Exception to be raised when trying to exec a not committed saga."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'Saga' must be committed." super().__init__(message)
[docs]class SagaStepException(SagaException): """Base exception for saga steps."""
[docs]class SagaNotDefinedException(SagaStepException): """Exception to be raised when the saga is not defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' must have a 'Saga' instance to call call this method." super().__init__(message)
[docs]class EmptySagaStepException(SagaStepException): """Exception to be raised when the step is empty."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' must have at least one defined action." super().__init__(message)
[docs]class MultipleOnExecuteException(SagaStepException): """Exception to be raised when multiple on execute methods are defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' can only define one 'on_execute' method." super().__init__(message)
[docs]class MultipleOnFailureException(SagaStepException): """Exception to be raised when multiple on failure methods are defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' can only define one 'on_failure' method." super().__init__(message)
[docs]class MultipleOnSuccessException(SagaStepException): """Exception to be raised when multiple on success methods are defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' can only define one 'on_success' method." super().__init__(message)
[docs]class MultipleOnErrorException(SagaStepException): """Exception to be raised when multiple on error methods are defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' can only define one 'on_error' method." super().__init__(message)
[docs]class MultipleElseThenException(SagaStepException): """Exception to be raised when multiple else then alternatives are defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'ConditionalSagaStep' can only define one 'else_then' method." super().__init__(message)
[docs]class AlreadyOnSagaException(SagaStepException): """Exception to be raised when a saga step is already in another saga."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' can only belong to one 'Saga' simultaneously." super().__init__(message)
[docs]class UndefinedOnExecuteException(SagaStepException): """Exception to be raised when the on execute method is not defined."""
[docs] def __init__(self, message: str = None): if message is None: message = "A 'SagaStep' must define at least the 'on_execute' logic." super().__init__(message)
[docs]class SagaExecutionException(SagaException): """Base exception for saga execution."""
[docs]class SagaExecutionNotFoundException(SagaExecutionException): """Exception to be raised when a saga execution is not found."""
[docs]class SagaRollbackExecutionException(SagaExecutionException): """Exception to be raised when a saga exception cannot be rollbacked"""
[docs]class SagaFailedExecutionException(SagaExecutionException): """Exception to be raised when a saga execution failed while running."""
[docs] def __init__(self, exception: Exception, message: str = None): self.exception = exception if message is None: message = f"There was a failure while 'SagaStepExecution' was executing: {exception!r}" super().__init__(message)
[docs]class SagaExecutionAlreadyExecutedException(SagaExecutionException): """Exception to be raised when a saga execution cannot be executed."""
[docs]class SagaStepExecutionException(SagaException): """Base exception for saga execution step."""
[docs]class SagaFailedExecutionStepException(SagaStepExecutionException, SagaFailedExecutionException): """Exception to be raised when a saga execution step failed while running."""
[docs]class SagaPausedExecutionStepException(SagaStepExecutionException): """Exception to be raised when a saga execution step is paused."""
[docs] def __init__(self, message: str = None): if message is None: message = "There was a pause while 'SagaStepExecution' was executing." super().__init__(message)
[docs]class SagaRollbackExecutionStepException(SagaStepExecutionException): """Exception to be raised when a saga execution step failed while performing a rollback."""
[docs]class AlreadyCommittedException(SagaException): """Exception to be raised when trying to modifying an already committed saga."""
[docs]class ExecutorException(SagaException): """Exception to be raised when a saga executor raises some exception."""
[docs] def __init__(self, exception: Exception, message: str = None): self.exception = exception if message is None: message = f"There was a failure while 'SagaStepExecution' was executing: {exception!r}" super().__init__(message)
[docs]class SagaFailedCommitCallbackException(SagaFailedExecutionException): """Exception to be raised when a saga commit callback raises some exception"""
[docs]class SagaResponseException(SagaException): """Exception to be used when ``CommandStatus`` is not ``SUCCESS``"""