Source code for minos.networks.brokers.subscribers.memory

from __future__ import (
    annotations,
)

from asyncio import (
    Queue,
)
from collections.abc import (
    Iterable,
)

from ..messages import (
    BrokerMessage,
)
from .abc import (
    BrokerSubscriber,
    BrokerSubscriberBuilder,
)


[docs]class InMemoryBrokerSubscriber(BrokerSubscriber): """In Memory Broker Subscriber class.""" _queue: Queue[BrokerMessage]
[docs] def __init__(self, topics: Iterable[str], messages: Iterable[BrokerMessage] = tuple(), **kwargs): super().__init__(topics, **kwargs) self._queue = Queue() for message in messages: self._queue.put_nowait(message)
[docs] def add_message(self, message: BrokerMessage) -> None: """Add a message to the subscriber. :param message: The message to be added. :return: This method does not return anything. """ self._queue.put_nowait(message)
async def _receive(self) -> BrokerMessage: return await self._queue.get()
[docs]class InMemoryBrokerSubscriberBuilder(BrokerSubscriberBuilder[InMemoryBrokerSubscriber]): """In Memory Broker Subscriber Builder class."""
[docs] def with_messages(self, messages: Iterable[BrokerMessage]) -> InMemoryBrokerSubscriberBuilder: """Set messages. :param messages: The topics to be set. :return: This method return the builder instance. """ self.kwargs["messages"] = messages return self
InMemoryBrokerSubscriber.set_builder(InMemoryBrokerSubscriberBuilder)