Source code for minos.saga.testing

from abc import (
    ABC,
    abstractmethod,
)
from uuid import (
    UUID,
)

from minos.common import (
    DatabaseOperation,
    DeclarativeModel,
)
from minos.common.testing import (
    MinosTestCase,
    MockedDatabaseClient,
    MockedDatabaseOperation,
)

from .context import (
    SagaContext,
)
from .definitions import (
    Saga,
)
from .exceptions import (
    SagaExecutionNotFoundException,
)
from .executions import (
    SagaExecution,
    SagaExecutionDatabaseOperationFactory,
    SagaExecutionRepository,
)


[docs]class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory): """For testing purposes"""
[docs] def build_store(self, uuid: UUID, **kwargs) -> DatabaseOperation: """For testing purposes""" return MockedDatabaseOperation("create_table")
[docs] def build_load(self, uuid: UUID) -> DatabaseOperation: """For testing purposes""" return MockedDatabaseOperation("create_table")
[docs] def build_delete(self, uuid: UUID) -> DatabaseOperation: """For testing purposes""" return MockedDatabaseOperation("create_table")
MockedDatabaseClient.set_factory( SagaExecutionDatabaseOperationFactory, MockedSagaExecutionDatabaseOperationFactory, )
[docs]class Foo(DeclarativeModel): """For testing purposes.""" foo: str
def _fn1(context: SagaContext) -> SagaContext: # pragma: no cover context["payment"] = "payment" return context def _fn2(context: SagaContext) -> SagaContext: # pragma: no cover context["payment"] = None return context _SAGA = Saga().local_step(_fn1).on_failure(_fn2).commit()
[docs]class SagaExecutionRepositoryTestCase(MinosTestCase, ABC): __test__ = False
[docs] def setUp(self) -> None: super().setUp() self.saga_execution_repository = self.build_saga_execution_repository()
[docs] async def asyncSetUp(self) -> None: await super().asyncSetUp() await self.saga_execution_repository.setup() execution = SagaExecution.from_definition(_SAGA) await execution.execute(autocommit=False) self.execution = execution self.another = SagaExecution.from_definition(_SAGA)
[docs] async def asyncTearDown(self): await self.saga_execution_repository.destroy() await super().asyncTearDown()
[docs] @abstractmethod def build_saga_execution_repository(self) -> SagaExecutionRepository: """For testing purposes."""
[docs] async def test_store(self): await self.saga_execution_repository.store(self.execution) self.assertEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid))
[docs] async def test_store_overwrite(self): await self.saga_execution_repository.store(self.execution) self.assertEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid)) self.another.uuid = self.execution.uuid await self.saga_execution_repository.store(self.another) self.assertNotEqual(self.execution, await self.saga_execution_repository.load(self.execution.uuid)) self.assertEqual(self.another, await self.saga_execution_repository.load(self.execution.uuid))
[docs] async def test_load_from_str(self): await self.saga_execution_repository.store(self.execution) self.assertEqual(self.execution, await self.saga_execution_repository.load(str(self.execution.uuid)))
[docs] async def test_load_raises(self): with self.assertRaises(SagaExecutionNotFoundException): await self.saga_execution_repository.load(self.execution.uuid)
[docs] async def test_delete(self): await self.saga_execution_repository.store(self.execution) await self.saga_execution_repository.delete(self.execution) with self.assertRaises(SagaExecutionNotFoundException): await self.saga_execution_repository.load(self.execution.uuid)
[docs] async def test_delete_from_str(self): await self.saga_execution_repository.store(self.execution) await self.saga_execution_repository.delete(str(self.execution.uuid)) with self.assertRaises(SagaExecutionNotFoundException): await self.saga_execution_repository.load(self.execution.uuid)