Source code for minos.saga.executions.executors.abc

import inspect
from typing import (
    Any,
    Callable,
)
from uuid import (
    UUID,
)

from minos.aggregate import (
    TransactionEntry,
)

from ...definitions import (
    SagaOperation,
)
from ...exceptions import (
    ExecutorException,
)


[docs]class Executor: """Executor class."""
[docs] def __init__(self, execution_uuid: UUID, *args, **kwargs): self.execution_uuid = execution_uuid
[docs] async def exec(self, operation: SagaOperation, *args, **kwargs) -> Any: """Execute the given operation. :param operation: The operation to be executed. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: The execution response. """ if operation.parameterized: kwargs = {**operation.parameters, **kwargs} return await self.exec_function(operation.callback, *args, **kwargs)
[docs] async def exec_function(self, func: Callable, *args, **kwargs) -> Any: """Execute a function. :param func: Function to be executed. :param args: Additional positional arguments to the function. :param kwargs: Additional named arguments to the function. :return: The ``func`` result. """ try: async with TransactionEntry(uuid=self.execution_uuid, autocommit=False): result = func(*args, **kwargs) if inspect.isawaitable(result): result = await result except Exception as exc: raise ExecutorException(exc) return result