Source code for minos.networks.brokers.publishers.queued.impl

from asyncio import (
    CancelledError,
    TimeoutError,
    create_task,
    wait_for,
)
from contextlib import (
    suppress,
)
from typing import (
    NoReturn,
)

from minos.common import (
    Builder,
)

from ...messages import (
    BrokerMessage,
)
from ..abc import (
    BrokerPublisher,
)
from .queues import (
    BrokerPublisherQueue,
)


[docs]class QueuedBrokerPublisher(BrokerPublisher): """Queued Broker Publisher class.""" impl: BrokerPublisher queue: BrokerPublisherQueue
[docs] def __init__(self, impl: BrokerPublisher, queue: BrokerPublisherQueue, *args, **kwargs): super().__init__(*args, **kwargs) self.impl = impl self.queue = queue self._run_task = None
async def _setup(self) -> None: await super()._setup() await self.queue.setup() await self.impl.setup() await self._start_task() async def _destroy(self) -> None: await self._stop_task() await self.impl.destroy() await self.queue.destroy() await super()._destroy() async def _start_task(self): if self._run_task is None: self._run_task = create_task(self._run()) async def _stop_task(self): if self._run_task is not None: self._run_task.cancel() with suppress(TimeoutError, CancelledError): await wait_for(self._run_task, 0.5) self._run_task = None async def _run(self) -> NoReturn: async for message in self.queue: await self.impl.send(message) async def _send(self, message: BrokerMessage) -> None: await self.queue.enqueue(message)
QueuedBrokerPublisher.set_builder(Builder)