Source code for minos.networks.brokers.subscribers.queued.impl

from __future__ import (
    annotations,
)

import warnings
from asyncio import (
    CancelledError,
    TimeoutError,
    create_task,
    wait_for,
)
from collections.abc import (
    Awaitable,
    Iterable,
)
from contextlib import (
    suppress,
)
from typing import (
    Any,
    NoReturn,
    Optional,
)

from minos.common import (
    Config,
)

from ...messages import (
    BrokerMessage,
)
from ..abc import (
    BrokerSubscriber,
    BrokerSubscriberBuilder,
)
from .queues import (
    BrokerSubscriberQueue,
    BrokerSubscriberQueueBuilder,
)


[docs]class QueuedBrokerSubscriber(BrokerSubscriber): """Queued Broker Subscriber class.""" impl: BrokerSubscriber queue: BrokerSubscriberQueue
[docs] def __init__(self, impl: BrokerSubscriber, queue: BrokerSubscriberQueue, **kwargs): super().__init__(kwargs.pop("topics", impl.topics), **kwargs) if self.topics != impl.topics or self.topics != queue.topics: raise ValueError("The topics from the impl and queue must be equal") self.impl = impl self.queue = queue self._run_task = None
async def _setup(self) -> None: await super()._setup() await self.queue.setup() await self.impl.setup() await self._start_run() async def _destroy(self) -> None: await self._stop_run() await self.impl.destroy() await self.queue.destroy() await super()._destroy() async def _start_run(self): if self._run_task is None: self._run_task = create_task(self._run()) async def _stop_run(self): if self._run_task is not None: self._run_task.cancel() with suppress(TimeoutError, CancelledError): await wait_for(self._run_task, 0.5) self._run_task = None async def _run(self) -> NoReturn: async for message in self.impl: await self.queue.enqueue(message) def _receive(self) -> Awaitable[BrokerMessage]: return self.queue.dequeue()
[docs]class QueuedBrokerSubscriberBuilder(BrokerSubscriberBuilder[QueuedBrokerSubscriber]): """Queued Broker Subscriber Publisher class."""
[docs] def __init__( self, *args, impl_builder: BrokerSubscriberBuilder, queue_builder: BrokerSubscriberQueueBuilder, **kwargs ): warnings.warn(f"{type(self)!r} has been deprecated. Use {BrokerSubscriberBuilder} instead.", DeprecationWarning) super().__init__(*args, **kwargs) self.impl_builder = impl_builder self.queue_builder = queue_builder
[docs] def with_config(self, config: Config) -> QueuedBrokerSubscriberBuilder: """Set config. :param config: The config to be set. :return: This method return the builder instance. """ self.impl_builder.with_config(config) self.queue_builder.with_config(config) return self
[docs] def with_kwargs(self, kwargs: dict[str, Any]) -> QueuedBrokerSubscriberBuilder: """Set kwargs. :param kwargs: The kwargs to be set. :return: This method return the builder instance. """ self.impl_builder.with_kwargs(kwargs) self.queue_builder.with_kwargs(kwargs) return self
[docs] def with_topics(self, topics: Iterable[str]) -> QueuedBrokerSubscriberBuilder: """Set topics. :param topics: The topics to be set. :return: This method return the builder instance. """ topics = set(topics) self.impl_builder.with_topics(topics) self.queue_builder.with_topics(topics) return self
[docs] def with_group_id(self, group_id: Optional[str]) -> QueuedBrokerSubscriberBuilder: """Set group_id. :param group_id: The group_id to be set. :return: This method return the builder instance. """ self.impl_builder.with_group_id(group_id) return self
[docs] def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool) -> QueuedBrokerSubscriberBuilder: """Set remove_topics_on_destroy. :param remove_topics_on_destroy: The remove_topics_on_destroy flag to be set. :return: This method return the builder instance. """ self.impl_builder.with_remove_topics_on_destroy(remove_topics_on_destroy) return self
[docs] def build(self) -> QueuedBrokerSubscriber: """Build the instance. :return: A ``QueuedBrokerSubscriber`` instance. """ impl = self.impl_builder.build() queue = self.queue_builder.build() return QueuedBrokerSubscriber(impl=impl, queue=queue, **self.kwargs)