from minos.common import (
MinosException,
)
[docs]class SagaException(MinosException):
"""Base saga exception."""
[docs]class EmptySagaException(SagaException):
"""Exception to be raised when saga is empty."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'Saga' must have at least one step."
super().__init__(message)
[docs]class SagaNotCommittedException(SagaException):
"""Exception to be raised when trying to exec a not committed saga."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'Saga' must be committed."
super().__init__(message)
[docs]class SagaStepException(SagaException):
"""Base exception for saga steps."""
[docs]class SagaNotDefinedException(SagaStepException):
"""Exception to be raised when the saga is not defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' must have a 'Saga' instance to call call this method."
super().__init__(message)
[docs]class EmptySagaStepException(SagaStepException):
"""Exception to be raised when the step is empty."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' must have at least one defined action."
super().__init__(message)
[docs]class MultipleOnExecuteException(SagaStepException):
"""Exception to be raised when multiple on execute methods are defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' can only define one 'on_execute' method."
super().__init__(message)
[docs]class MultipleOnFailureException(SagaStepException):
"""Exception to be raised when multiple on failure methods are defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' can only define one 'on_failure' method."
super().__init__(message)
[docs]class MultipleOnSuccessException(SagaStepException):
"""Exception to be raised when multiple on success methods are defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' can only define one 'on_success' method."
super().__init__(message)
[docs]class MultipleOnErrorException(SagaStepException):
"""Exception to be raised when multiple on error methods are defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' can only define one 'on_error' method."
super().__init__(message)
[docs]class MultipleElseThenException(SagaStepException):
"""Exception to be raised when multiple else then alternatives are defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'ConditionalSagaStep' can only define one 'else_then' method."
super().__init__(message)
[docs]class AlreadyOnSagaException(SagaStepException):
"""Exception to be raised when a saga step is already in another saga."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' can only belong to one 'Saga' simultaneously."
super().__init__(message)
[docs]class UndefinedOnExecuteException(SagaStepException):
"""Exception to be raised when the on execute method is not defined."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "A 'SagaStep' must define at least the 'on_execute' logic."
super().__init__(message)
[docs]class SagaExecutionException(SagaException):
"""Base exception for saga execution."""
[docs]class SagaExecutionNotFoundException(SagaExecutionException):
"""Exception to be raised when a saga execution is not found."""
[docs]class SagaRollbackExecutionException(SagaExecutionException):
"""Exception to be raised when a saga exception cannot be rollbacked"""
[docs]class SagaFailedExecutionException(SagaExecutionException):
"""Exception to be raised when a saga execution failed while running."""
[docs] def __init__(self, exception: Exception, message: str = None):
self.exception = exception
if message is None:
message = f"There was a failure while 'SagaStepExecution' was executing: {exception!r}"
super().__init__(message)
[docs]class SagaExecutionAlreadyExecutedException(SagaExecutionException):
"""Exception to be raised when a saga execution cannot be executed."""
[docs]class SagaStepExecutionException(SagaException):
"""Base exception for saga execution step."""
[docs]class SagaFailedExecutionStepException(SagaStepExecutionException, SagaFailedExecutionException):
"""Exception to be raised when a saga execution step failed while running."""
[docs]class SagaPausedExecutionStepException(SagaStepExecutionException):
"""Exception to be raised when a saga execution step is paused."""
[docs] def __init__(self, message: str = None):
if message is None:
message = "There was a pause while 'SagaStepExecution' was executing."
super().__init__(message)
[docs]class SagaRollbackExecutionStepException(SagaStepExecutionException):
"""Exception to be raised when a saga execution step failed while performing a rollback."""
[docs]class AlreadyCommittedException(SagaException):
"""Exception to be raised when trying to modifying an already committed saga."""
[docs]class ExecutorException(SagaException):
"""Exception to be raised when a saga executor raises some exception."""
[docs] def __init__(self, exception: Exception, message: str = None):
self.exception = exception
if message is None:
message = f"There was a failure while 'SagaStepExecution' was executing: {exception!r}"
super().__init__(message)
[docs]class SagaFailedCommitCallbackException(SagaFailedExecutionException):
"""Exception to be raised when a saga commit callback raises some exception"""
[docs]class SagaResponseException(SagaException):
"""Exception to be used when ``CommandStatus`` is not ``SUCCESS``"""