Source code for minos.networks.brokers.pools

from __future__ import (
    annotations,
)

import logging
from contextvars import (
    Token,
)
from typing import (
    Any,
    AsyncContextManager,
    Optional,
)

from minos.common import (
    Config,
    Pool,
)

from .clients import (
    BrokerClient,
)
from .messages import (
    REQUEST_REPLY_TOPIC_CONTEXT_VAR,
)

logger = logging.getLogger(__name__)


[docs]class BrokerClientPool(Pool): """Broker Client Pool class."""
[docs] def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs): super().__init__(maxsize=maxsize, *args, **kwargs) self._instance_kwargs = instance_kwargs
@classmethod def _from_config(cls, config: Config, **kwargs) -> BrokerClientPool: return cls(kwargs | {"config": config}) async def _create_instance(self) -> BrokerClient: instance = BrokerClient.from_config(**self._instance_kwargs) await instance.setup() return instance async def _destroy_instance(self, instance: BrokerClient): await instance.destroy()
[docs] def acquire(self, *args, **kwargs) -> AsyncContextManager: """Acquire a new instance wrapped on an asynchronous context manager. :return: An asynchronous context manager. """ return _ReplyTopicContextManager(super().acquire())
class _ReplyTopicContextManager: _token: Optional[Token] def __init__(self, wrapper: AsyncContextManager[BrokerClient]): self.wrapper = wrapper self._token = None async def __aenter__(self) -> BrokerClient: broker = await self.wrapper.__aenter__() self._token = REQUEST_REPLY_TOPIC_CONTEXT_VAR.set(broker.topic) return broker async def __aexit__(self, exc_type, exc_val, exc_tb): REQUEST_REPLY_TOPIC_CONTEXT_VAR.reset(self._token) await self.wrapper.__aexit__(exc_type, exc_val, exc_tb)