Source code for minos.networks.brokers.publishers.abc

from __future__ import (
    annotations,
)

import logging
from abc import (
    ABC,
    abstractmethod,
)
from typing import (
    TYPE_CHECKING,
    Any,
    Generic,
    Optional,
    TypeVar,
    Union,
)

from minos.common import (
    BuildableMixin,
    Builder,
    Config,
    Injectable,
    MinosConfigException,
)

from ..messages import (
    BrokerMessage,
)

if TYPE_CHECKING:
    from .queued import (
        BrokerPublisherQueue,
        QueuedBrokerPublisher,
    )

logger = logging.getLogger(__name__)


[docs]@Injectable("broker_publisher") class BrokerPublisher(ABC, BuildableMixin): """Broker Publisher class."""
[docs] async def send(self, message: BrokerMessage) -> None: """Send a message. :param message: The message to be sent. :return: This method does not return anything. """ logger.debug(f"Sending {message!r} message...") await self._send(message)
@abstractmethod async def _send(self, message: BrokerMessage) -> None: raise NotImplementedError
BrokerPublisherCls = TypeVar("BrokerPublisherCls", bound=BrokerPublisher)
[docs]class BrokerPublisherBuilder(Builder[BrokerPublisher], Generic[BrokerPublisherCls]): """Broker Publisher Builder class."""
[docs] def __init__( self, *args, queue_builder: Optional[Builder] = None, queued_cls: Optional[type[QueuedBrokerPublisher]] = None, **kwargs, ): super().__init__(*args, **kwargs) if queued_cls is None: from .queued import ( QueuedBrokerPublisher, ) queued_cls = QueuedBrokerPublisher self.queue_builder = queue_builder self.queued_cls = queued_cls
[docs] def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]): """Set the queued class. :param queued_cls: A subclass of ``QueuedBrokerPublisher``. :return: This method return the builder instance. """ self.queued_cls = queued_cls return self
[docs] def with_config(self, config: Config): """Set config. :param config: The config to be set. :return: This method return the builder instance. """ self._with_builders_from_config(config) if self.queue_builder is not None: self.queue_builder.with_config(config) return super().with_config(config)
def _with_builders_from_config(self, config): try: broker_config = config.get_interface_by_name("broker") except MinosConfigException: return broker_publisher_config = broker_config["publisher"] if "queue" in broker_publisher_config: self.with_queue(broker_publisher_config["queue"])
[docs] def with_queue(self, queue: Union[type[BrokerPublisherQueue], Builder[BrokerPublisherQueue]]): """Set the queue builder. :param queue: The queue builder to be set. :return: This method return the builder instance. """ if not isinstance(queue, Builder): queue = queue.get_builder() self.queue_builder = queue.copy() return self
[docs] def with_kwargs(self, kwargs: dict[str, Any]): """Set kwargs. :param kwargs: The kwargs to be set. :return: This method return the builder instance. """ if self.queue_builder is not None: self.queue_builder.with_kwargs(kwargs) return super().with_kwargs(kwargs)
[docs] def build(self) -> BrokerPublisher: """Build the instance. :return: A ``QueuedBrokerSubscriber`` instance. """ impl = super().build() if self.queue_builder is not None: queue = self.queue_builder.build() impl = self.queued_cls(impl=impl, queue=queue, **self.kwargs) return impl
BrokerPublisher.set_builder(BrokerPublisherBuilder)