Source code for minos.networks.brokers.collections.queues.memory

import logging
from asyncio import (
    Queue,
    QueueEmpty,
    TimeoutError,
    wait_for,
)

from ...messages import (
    BrokerMessage,
)
from .abc import (
    BrokerQueue,
)

logger = logging.getLogger(__name__)


[docs]class InMemoryBrokerQueue(BrokerQueue): """In Memory Broker Queue class.""" _queue: Queue[BrokerMessage]
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._queue = Queue()
async def _destroy(self) -> None: await self._flush_queue() await super()._destroy() async def _flush_queue(self): try: await wait_for(self._queue.join(), 0.5) except TimeoutError: messages = list() while True: try: messages.append(self._queue.get_nowait()) except QueueEmpty: break logger.warning(f"Some messages were loosed: {messages}") async def _enqueue(self, message: BrokerMessage) -> None: await self._queue.put(message) async def _dequeue(self) -> BrokerMessage: message = await self._queue.get() self._queue.task_done() return message