Source code for minos.networks.brokers.handlers.impl

from __future__ import (
    annotations,
)

import logging
from asyncio import (
    Queue,
    create_task,
    gather,
)
from collections.abc import (
    Iterable,
)
from typing import (
    NoReturn,
    Optional,
)

from minos.common import (
    Config,
    Inject,
    NotProvidedException,
    SetupMixin,
)

from ..dispatchers import (
    BrokerDispatcher,
)
from ..subscribers import (
    BrokerSubscriber,
    BrokerSubscriberBuilder,
)

logger = logging.getLogger(__name__)


[docs]class BrokerHandler(SetupMixin): """Broker Handler class."""
[docs] def __init__( self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5, *args, **kwargs ): super().__init__(*args, **kwargs) self._dispatcher = dispatcher self._subscriber = subscriber self._queue = Queue(maxsize=1) self._consumers = list() self._concurrency = concurrency
@classmethod def _from_config(cls, config: Config, **kwargs) -> BrokerHandler: dispatcher = cls._get_dispatcher(config, **kwargs) subscriber = cls._get_subscriber(config, topics=set(dispatcher.actions.keys()), **kwargs) return cls(dispatcher, subscriber, **kwargs) @staticmethod @Inject() def _get_dispatcher( config: Config, dispatcher: Optional[BrokerDispatcher] = None, broker_dispatcher: Optional[BrokerDispatcher] = None, **kwargs, ) -> BrokerDispatcher: if dispatcher is None: dispatcher = broker_dispatcher if dispatcher is None: dispatcher = BrokerDispatcher.from_config(config, **kwargs) return dispatcher @staticmethod @Inject() def _get_subscriber( config: Config, topics: Iterable[str], subscriber: Optional[BrokerSubscriber] = None, broker_subscriber: Optional[BrokerSubscriber] = None, subscriber_builder: Optional[BrokerSubscriberBuilder] = None, broker_subscriber_builder: Optional[BrokerSubscriberBuilder] = None, **kwargs, ) -> BrokerSubscriber: if not isinstance(subscriber, BrokerSubscriber): subscriber = broker_subscriber if not isinstance(subscriber, BrokerSubscriber): if not isinstance(subscriber_builder, BrokerSubscriberBuilder): subscriber_builder = broker_subscriber_builder if isinstance(subscriber_builder, BrokerSubscriberBuilder): subscriber = ( subscriber_builder.copy().with_config(config).with_topics(topics).with_kwargs(kwargs).build() ) if not isinstance(subscriber, BrokerSubscriber): raise NotProvidedException(f"A {BrokerSubscriber!r} or {BrokerSubscriberBuilder!r} must be provided.") return subscriber async def _setup(self) -> None: await super()._setup() await self._dispatcher.setup() await self._create_consumers() await self._subscriber.setup() async def _destroy(self) -> None: await self._subscriber.destroy() await self._destroy_consumers() await self._dispatcher.destroy() await super()._destroy()
[docs] async def run(self) -> NoReturn: """Run the message handling process. :return: This method does not return anything. """ async for message in self._subscriber: await self._queue.put(message)
async def _create_consumers(self): while len(self._consumers) < self._concurrency: self._consumers.append(create_task(self._consume())) async def _destroy_consumers(self): await self._queue.join() for consumer in self._consumers: consumer.cancel() await gather(*self._consumers, return_exceptions=True) self._consumers = list() async def _consume(self) -> None: while True: await self._consume_one() async def _consume_one(self) -> None: message = await self._queue.get() try: try: await self._dispatcher.dispatch(message) except Exception as exc: logger.warning(f"An exception was raised: {exc!r}") finally: self._queue.task_done()