from __future__ import (
annotations,
)
import logging
from asyncio import (
TimeoutError,
wait_for,
)
from contextlib import (
suppress,
)
from functools import (
partial,
)
from typing import (
Optional,
)
from aiokafka import (
AIOKafkaProducer,
)
from minos.networks import (
BrokerMessage,
BrokerPublisher,
BrokerPublisherBuilder,
)
from .common import (
KafkaBrokerBuilderMixin,
KafkaCircuitBreakerMixin,
)
logger = logging.getLogger(__name__)
[docs]class KafkaBrokerPublisher(BrokerPublisher, KafkaCircuitBreakerMixin):
"""Kafka Broker Publisher class."""
[docs] def __init__(self, *args, host: Optional[str] = None, port: Optional[int] = None, **kwargs):
super().__init__(*args, **kwargs)
if host is None:
host = "localhost"
if port is None:
port = 9092
self._host = host
self._port = port
self._client = None
@property
def host(self) -> str:
"""The host of kafka.
:return: A ``str`` value.
"""
return self._host
@property
def port(self) -> int:
"""The port of kafka.
:return: A ``int`` value.
"""
return self._port
async def _setup(self) -> None:
await super()._setup()
await self._start_client()
async def _destroy(self) -> None:
await self._stop_client()
await super()._destroy()
async def _start_client(self) -> None:
# noinspection PyBroadException
try:
await self.with_circuit_breaker(self.client.start)
except Exception as exc:
await self._stop_client()
raise exc
async def _stop_client(self):
with suppress(TimeoutError):
await wait_for(self._client.stop(), 0.5)
async def _send(self, message: BrokerMessage) -> None:
fn = partial(self.client.send_and_wait, message.topic, message.avro_bytes)
await self.with_circuit_breaker(fn)
@property
def client(self) -> AIOKafkaProducer:
"""Get the client instance.
:return: An ``AIOKafkaProducer`` instance.
"""
if self._client is None:
self._client = self._build_client()
return self._client
def _build_client(self) -> AIOKafkaProducer:
return AIOKafkaProducer(bootstrap_servers=self._bootstrap_servers)
@property
def _bootstrap_servers(self):
return f"{self.host}:{self.port}"
[docs]class KafkaBrokerPublisherBuilder(BrokerPublisherBuilder[KafkaBrokerPublisher], KafkaBrokerBuilderMixin):
"""Kafka Broker Publisher Builder class."""
KafkaBrokerPublisher.set_builder(KafkaBrokerPublisherBuilder)