Source code for minos.networks.brokers.collections.queues.abc

from __future__ import (
    annotations,
)

import logging
from abc import (
    ABC,
    abstractmethod,
)
from collections.abc import (
    AsyncIterator,
)

from minos.common import (
    BuildableMixin,
)

from ...messages import (
    BrokerMessage,
)

logger = logging.getLogger(__name__)


[docs]class BrokerQueue(ABC, BuildableMixin): """Broker Queue class."""
[docs] async def enqueue(self, message: BrokerMessage) -> None: """Enqueue method.""" logger.debug(f"Enqueuing {message!r} message...") await self._enqueue(message)
@abstractmethod async def _enqueue(self, message: BrokerMessage) -> None: raise NotImplementedError def __aiter__(self) -> AsyncIterator[BrokerMessage]: return self async def __anext__(self) -> BrokerMessage: if self.already_destroyed: raise StopAsyncIteration return await self.dequeue()
[docs] async def dequeue(self) -> BrokerMessage: """Dequeue method.""" message = await self._dequeue() logger.debug(f"Dequeuing {message!r} message...") return message
@abstractmethod async def _dequeue(self) -> BrokerMessage: raise NotImplementedError