Source code for minos.networks.brokers.dispatchers.impl

from __future__ import (
    annotations,
)

import logging
import traceback
from collections.abc import (
    Awaitable,
    Callable,
)
from functools import (
    wraps,
)
from inspect import (
    isawaitable,
)
from typing import (
    Optional,
    Union,
)

from minos.common import (
    Config,
    Inject,
    NotProvidedException,
    SetupMixin,
)

from ...decorators import (
    EnrouteFactory,
)
from ...exceptions import (
    MinosActionNotFoundException,
)
from ...requests import (
    REQUEST_USER_CONTEXT_VAR,
    Request,
    Response,
    ResponseException,
)
from ..messages import (
    REQUEST_HEADERS_CONTEXT_VAR,
    BrokerMessage,
    BrokerMessageV1,
    BrokerMessageV1Payload,
    BrokerMessageV1Status,
)
from ..publishers import (
    BrokerPublisher,
)
from .requests import (
    BrokerRequest,
    BrokerResponse,
)

logger = logging.getLogger(__name__)


[docs]class BrokerDispatcher(SetupMixin): """Broker Dispatcher class."""
[docs] def __init__(self, actions: dict[str, Optional[Callable]], publisher: BrokerPublisher, **kwargs): super().__init__(**kwargs) self._actions = actions self._publisher = publisher
@classmethod def _from_config(cls, config: Config, **kwargs) -> BrokerDispatcher: kwargs["actions"] = cls._get_actions(config, **kwargs) kwargs["publisher"] = cls._get_publisher(**kwargs) # noinspection PyProtectedMember return cls(**kwargs) @staticmethod def _get_actions( config: Config, handlers: dict[str, Optional[Callable]] = None, **kwargs ) -> dict[str, Callable[[BrokerRequest], Awaitable[Optional[BrokerResponse]]]]: if handlers is None: builder = EnrouteFactory(*config.get_services(), middleware=config.get_middleware()) decorators = builder.get_broker_command_query_event(config=config, **kwargs) handlers = {decorator.topic: fn for decorator, fn in decorators.items()} return handlers # noinspection PyUnusedLocal @staticmethod @Inject() def _get_publisher( publisher: Optional[BrokerPublisher] = None, broker_publisher: BrokerPublisher = None, **kwargs, ) -> BrokerPublisher: if publisher is None: publisher = broker_publisher if publisher is None: raise NotProvidedException(f"A {BrokerPublisher!r} object must be provided.") return publisher @property def publisher(self) -> BrokerPublisher: """Get the publisher instance. :return: A ``BrokerPublisher`` instance. """ return self._publisher @property def actions(self) -> dict[str, Optional[Callable]]: """Actions getter. :return: A dictionary in which the keys are topics and the values are the handler. """ return self._actions
[docs] async def dispatch(self, message: BrokerMessage) -> None: """Dispatch an entry. :param message: The entry to be dispatched. :return: This method does not return anything. """ action = self.get_action(message.topic) fn = self.get_callback(action) payload = await fn(message) if message.should_reply: reply = BrokerMessageV1(topic=message.reply_topic, payload=payload, identifier=message.identifier) await self.publisher.send(reply)
[docs] @staticmethod def get_callback( fn: Callable[[BrokerRequest], Union[Optional[BrokerResponse], Awaitable[Optional[BrokerResponse]]]] ) -> Callable[[BrokerMessage], Awaitable[BrokerMessageV1Payload]]: """Get the handler function to be used by the Broker Handler. :param fn: The action function. :return: A wrapper function around the given one that is compatible with the Broker Handler API. """ @wraps(fn) async def _wrapper(raw: BrokerMessage) -> BrokerMessageV1Payload: logger.info(f"Dispatching '{raw!s}'...") request = BrokerRequest(raw) user_token = REQUEST_USER_CONTEXT_VAR.set(request.user) headers_token = REQUEST_HEADERS_CONTEXT_VAR.set(request.headers) try: response = fn(request) if isawaitable(response): response = await response if isinstance(response, Response): content, status = await response.content(), response.status else: content, status = None, BrokerMessageV1Status.SUCCESS except ResponseException as exc: tb = traceback.format_exc() logger.error(f"Raised an application exception:\n {tb}") content, status = repr(exc), exc.status except Exception as exc: tb = traceback.format_exc() logger.exception(f"Raised a system exception:\n {tb}") content, status = repr(exc), BrokerMessageV1Status.SYSTEM_ERROR finally: headers = REQUEST_HEADERS_CONTEXT_VAR.get() REQUEST_USER_CONTEXT_VAR.reset(user_token) REQUEST_HEADERS_CONTEXT_VAR.reset(headers_token) return BrokerMessageV1Payload(content=content, status=status, headers=headers) return _wrapper
[docs] def get_action(self, topic: str) -> Callable[[Request], Union[Optional[Response], Awaitable[Optional[Response]]]]: """Get the handling function to be called. :param topic: The name of the topic that matches the action. :return: A ``Callable`` instance. """ if topic not in self._actions: raise MinosActionNotFoundException( f"topic {topic} have no controller/action configured, " f"please review th configuration file" ) action = self._actions[topic] logger.debug(f"Loaded {action!r} action!") return action