Source code for minos.plugins.kafka.common

from __future__ import (
    annotations,
)

from collections.abc import (
    Iterable,
)

from kafka.errors import (
    KafkaError,
)

from minos.common import (
    Builder,
    CircuitBreakerMixin,
    Config,
)


[docs]class KafkaCircuitBreakerMixin(CircuitBreakerMixin): """Kafka Circuit Breaker Mixin class."""
[docs] def __init__(self, *args, circuit_breaker_exceptions: Iterable[type] = tuple(), **kwargs): super().__init__(*args, circuit_breaker_exceptions=(KafkaError, *circuit_breaker_exceptions), **kwargs)
[docs]class KafkaBrokerBuilderMixin(Builder): """Kafka Broker Builder Mixin class."""
[docs] def with_config(self, config: Config): """Set config. :param config: The config to be set. :return: This method return the builder instance. """ broker_config = config.get_interface_by_name("broker") common_config = broker_config.get("common", dict()) self.kwargs |= { "group_id": config.get_name(), "host": common_config.get("host"), "port": common_config.get("port"), } return super().with_config(config)