Source code for minos.plugins.kafka.publisher

from __future__ import (
    annotations,
)

import logging
from asyncio import (
    TimeoutError,
    wait_for,
)
from contextlib import (
    suppress,
)
from functools import (
    partial,
)
from typing import (
    Optional,
)

from aiokafka import (
    AIOKafkaProducer,
)

from minos.networks import (
    BrokerMessage,
    BrokerPublisher,
    BrokerPublisherBuilder,
)

from .common import (
    KafkaBrokerBuilderMixin,
    KafkaCircuitBreakerMixin,
)

logger = logging.getLogger(__name__)


[docs]class KafkaBrokerPublisher(BrokerPublisher, KafkaCircuitBreakerMixin): """Kafka Broker Publisher class."""
[docs] def __init__(self, *args, host: Optional[str] = None, port: Optional[int] = None, **kwargs): super().__init__(*args, **kwargs) if host is None: host = "localhost" if port is None: port = 9092 self._host = host self._port = port self._client = None
@property def host(self) -> str: """The host of kafka. :return: A ``str`` value. """ return self._host @property def port(self) -> int: """The port of kafka. :return: A ``int`` value. """ return self._port async def _setup(self) -> None: await super()._setup() await self._start_client() async def _destroy(self) -> None: await self._stop_client() await super()._destroy() async def _start_client(self) -> None: # noinspection PyBroadException try: await self.with_circuit_breaker(self.client.start) except Exception as exc: await self._stop_client() raise exc async def _stop_client(self): with suppress(TimeoutError): await wait_for(self._client.stop(), 0.5) async def _send(self, message: BrokerMessage) -> None: fn = partial(self.client.send_and_wait, message.topic, message.avro_bytes) await self.with_circuit_breaker(fn) @property def client(self) -> AIOKafkaProducer: """Get the client instance. :return: An ``AIOKafkaProducer`` instance. """ if self._client is None: self._client = self._build_client() return self._client def _build_client(self) -> AIOKafkaProducer: return AIOKafkaProducer(bootstrap_servers=self._bootstrap_servers) @property def _bootstrap_servers(self): return f"{self.host}:{self.port}"
[docs]class KafkaBrokerPublisherBuilder(BrokerPublisherBuilder[KafkaBrokerPublisher], KafkaBrokerBuilderMixin): """Kafka Broker Publisher Builder class."""
KafkaBrokerPublisher.set_builder(KafkaBrokerPublisherBuilder)