Source code for minos.plugins.rabbitmq.publisher

from __future__ import (
    annotations,
)

import logging
from typing import (
    Optional,
)

from aio_pika import (
    Message,
    connect,
)

from minos.networks import (
    BrokerMessage,
    BrokerPublisher,
    BrokerPublisherBuilder,
)

from .common import (
    RabbitMQBrokerBuilderMixin,
)

logger = logging.getLogger(__name__)


[docs]class RabbitMQBrokerPublisher(BrokerPublisher): """RabbitMQ Broker Publisher class."""
[docs] def __init__( self, *args, host: Optional[str] = None, port: Optional[int] = None, user: str = None, password: str = None, **kwargs, ): super().__init__(*args, **kwargs) if host is None: host = "localhost" if port is None: port = 5672 if user is None: user = "guest" if password is None: password = "guest" self.host = host self.port = port self.user = user self.password = password self.connection = None self.channel = None
async def _setup(self) -> None: await super()._setup() self.connection = await connect(f"amqp://{self.user}:{self.password}@{self.host}:{self.port}/") self.channel = await self.connection.channel() async def _destroy(self) -> None: await self.channel.close() await self.connection.close() await super()._destroy() async def _send(self, message: BrokerMessage) -> None: await self.channel.default_exchange.publish(Message(message.avro_bytes), routing_key=message.topic)
[docs]class RabbitMQBrokerPublisherBuilder(BrokerPublisherBuilder[RabbitMQBrokerPublisher], RabbitMQBrokerBuilderMixin): """RabbitMQ Broker Publisher Builder class."""
RabbitMQBrokerPublisher.set_builder(RabbitMQBrokerPublisherBuilder)