Source code for minos.networks.decorators.api

from __future__ import (
    annotations,
)

from contextlib import (
    suppress,
)
from typing import (
    Optional,
    Protocol,
)

from minos.common import (
    get_internal_modules,
)

from .definitions import (
    BrokerCommandEnrouteDecorator,
    BrokerEventEnrouteDecorator,
    BrokerQueryEnrouteDecorator,
    EnrouteDecorator,
    PeriodicEventEnrouteDecorator,
    RestCommandEnrouteDecorator,
    RestQueryEnrouteDecorator,
)


[docs]class SubEnroute(Protocol): """Sub Enroute protocol class.""" command: Optional[EnrouteDecorator] query: Optional[EnrouteDecorator] event: Optional[EnrouteDecorator]
[docs]class BrokerEnroute: """Broker Enroute class""" command = BrokerCommandEnrouteDecorator query = BrokerQueryEnrouteDecorator event = BrokerEventEnrouteDecorator
[docs]class RestEnroute: """Rest Enroute class""" command = RestCommandEnrouteDecorator query = RestQueryEnrouteDecorator
[docs]class PeriodicEnroute: """Periodic Enroute class.""" event = PeriodicEventEnrouteDecorator
[docs]class EnrouteType(type): """Enroute type.""" def __getattr__(cls, item: str) -> SubEnroute: for module in get_internal_modules(): with suppress(AttributeError): # noinspection PyProtectedMember module._register_enroute() with suppress(AttributeError): return object.__getattribute__(cls, item) raise AttributeError(f"{item} not in enroute.")
[docs]class Enroute(metaclass=EnrouteType): """Enroute decorator main class""" broker = BrokerEnroute rest = RestEnroute periodic = PeriodicEnroute @classmethod def _register_sub_enroute(cls, name: str, sub_enroute) -> None: """Register a new sub enroute. :param name: The name to be registered. :param sub_enroute: Teh sub enroute to be registered. :return: This method does not return anything. """ setattr(cls, name, sub_enroute) @classmethod def _unregister_sub_enroute(cls, name: str) -> None: """Unregister a new sub enroute. :param name: The name to be unregistered. :return: This method does not return anything. """ delattr(cls, name)
enroute = Enroute