Source code for minos.networks.brokers.subscribers.abc

from __future__ import (
    annotations,
)

import logging
from abc import (
    ABC,
    abstractmethod,
)
from collections.abc import (
    AsyncIterator,
    Iterable,
)
from typing import (
    TYPE_CHECKING,
    Any,
    Generic,
    Optional,
    TypeVar,
    Union,
)

from minos.common import (
    BuildableMixin,
    Builder,
    Config,
    Injectable,
    MinosConfigException,
)

from ..messages import (
    BrokerMessage,
)

if TYPE_CHECKING:
    from .filtered import (
        BrokerSubscriberValidator,
        FilteredBrokerSubscriber,
    )
    from .queued import (
        BrokerSubscriberQueue,
        BrokerSubscriberQueueBuilder,
        QueuedBrokerSubscriber,
    )


logger = logging.getLogger(__name__)


[docs]class BrokerSubscriber(ABC, BuildableMixin): """Broker Subscriber class."""
[docs] def __init__(self, topics: Iterable[str], **kwargs): super().__init__(**kwargs) self._topics = set(topics)
@property def topics(self) -> set[str]: """Topics getter. :return: A list of string values. """ return self._topics def __aiter__(self) -> AsyncIterator[BrokerMessage]: return self async def __anext__(self) -> BrokerMessage: if self.already_destroyed: raise StopAsyncIteration return await self.receive()
[docs] async def receive(self) -> BrokerMessage: """Receive a new message. :return: A ``BrokerMessage`` instance. """ message = await self._receive() logger.debug(f"Receiving {message!r} message...") return message
@abstractmethod async def _receive(self) -> BrokerMessage: raise NotImplementedError
BrokerSubscriberCls = TypeVar("BrokerSubscriberCls", bound=BrokerSubscriber)
[docs]@Injectable("broker_subscriber_builder") class BrokerSubscriberBuilder(Builder[BrokerSubscriberCls], Generic[BrokerSubscriberCls]): """Broker Subscriber Builder class."""
[docs] def __init__( self, *args, validator_builder: Optional[Builder] = None, queue_builder: Optional[BrokerSubscriberQueueBuilder] = None, filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None, queued_cls: Optional[type[QueuedBrokerSubscriber]] = None, **kwargs, ): super().__init__(*args, **kwargs) if filtered_cls is None: from .filtered import ( FilteredBrokerSubscriber, ) filtered_cls = FilteredBrokerSubscriber if queued_cls is None: from .queued import ( QueuedBrokerSubscriber, ) queued_cls = QueuedBrokerSubscriber self.validator_builder = validator_builder self.queue_builder = queue_builder self.filtered_cls = filtered_cls self.queued_cls = queued_cls
[docs] def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]): """Set the filtered class. :param filtered_cls: A subclass of ``FilteredBrokerSubscriber``. :return: This method return the builder instance. """ self.filtered_cls = filtered_cls return self
[docs] def with_queued_cls(self, queued_cls: type[QueuedBrokerSubscriber]): """Set the queued class. :param queued_cls: A subclass of ``QueuedBrokerSubscriber``. :return: This method return the builder instance. """ self.queued_cls = queued_cls return self
[docs] def with_config(self, config: Config): """Set config. :param config: The config to be set. :return: This method return the builder instance. """ self._with_builders_from_config(config) if self.validator_builder is not None: self.validator_builder.with_config(config) if self.queue_builder is not None: self.queue_builder.with_config(config) return super().with_config(config)
def _with_builders_from_config(self, config): try: broker_config = config.get_interface_by_name("broker") except MinosConfigException: return broker_subscriber_config = broker_config["subscriber"] if "validator" in broker_subscriber_config: self.with_validator(broker_subscriber_config["validator"]) if "queue" in broker_subscriber_config: self.with_queue(broker_subscriber_config["queue"])
[docs] def with_validator( self, validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]], ): """Set the duplicate detector. :param validator: The duplicate detector to be set. :return: This method return the builder instance. """ if not isinstance(validator, Builder): validator = validator.get_builder() self.validator_builder = validator.copy() return self
[docs] def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]): """Set the queue builder. :param queue: The queue to be set. :return: This method return the builder instance. """ if not isinstance(queue, Builder): queue = queue.get_builder() self.queue_builder = queue.copy() return self
[docs] def with_kwargs(self, kwargs: dict[str, Any]): """Set kwargs. :param kwargs: The kwargs to be set. :return: This method return the builder instance. """ if self.validator_builder is not None: self.validator_builder.with_kwargs(kwargs) if self.queue_builder is not None: self.queue_builder.with_kwargs(kwargs) return super().with_kwargs(kwargs)
[docs] def with_group_id(self, group_id: Optional[str]): """Set group_id. :param group_id: The group_id to be set. :return: This method return the builder instance. """ self.kwargs["group_id"] = group_id return self
[docs] def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool): """Set remove_topics_on_destroy. :param remove_topics_on_destroy: The remove_topics_on_destroy flag to be set. :return: This method return the builder instance. """ self.kwargs["remove_topics_on_destroy"] = remove_topics_on_destroy return self
[docs] def with_topics(self, topics: Iterable[str]): """Set topics. :param topics: The topics to be set. :return: This method return the builder instance. """ topics = set(topics) self.kwargs["topics"] = set(topics) if self.queue_builder is not None: self.queue_builder.with_topics(topics) return self
[docs] def build(self) -> BrokerSubscriber: """Build the instance. :return: A ``QueuedBrokerSubscriber`` instance. """ impl = super().build() if self.validator_builder is not None: validator = self.validator_builder.build() impl = self.filtered_cls(impl=impl, validator=validator, **self.kwargs) if self.queue_builder is not None: queue = self.queue_builder.build() impl = self.queued_cls(impl=impl, queue=queue, **self.kwargs) return impl
BrokerSubscriber.set_builder(BrokerSubscriberBuilder)