Source code for minos.networks.decorators.definitions.broker

from abc import (
    ABC,
)
from typing import (
    Final,
    Iterable,
)

from .abc import (
    EnrouteDecorator,
)
from .kinds import (
    EnrouteDecoratorKind,
)


[docs]class BrokerEnrouteDecorator(EnrouteDecorator, ABC): """Broker Enroute class"""
[docs] def __init__(self, topic: str, **kwargs): super().__init__(**kwargs) self.topic = topic
def __iter__(self) -> Iterable: yield from (self.topic,)
[docs]class BrokerCommandEnrouteDecorator(BrokerEnrouteDecorator): """Broker Command Enroute class""" KIND: Final[EnrouteDecoratorKind] = EnrouteDecoratorKind.Command
[docs]class BrokerQueryEnrouteDecorator(BrokerEnrouteDecorator): """Broker Query Enroute class""" KIND: Final[EnrouteDecoratorKind] = EnrouteDecoratorKind.Query
[docs]class BrokerEventEnrouteDecorator(BrokerEnrouteDecorator): """Broker Event Enroute class""" KIND: Final[EnrouteDecoratorKind] = EnrouteDecoratorKind.Event