Source code for minos.networks.brokers.messages.models.v1

from __future__ import (
    annotations,
)

import logging
import warnings
from enum import (
    Enum,
)
from functools import (
    total_ordering,
)
from typing import (
    Any,
    Optional,
)
from uuid import (
    UUID,
    uuid4,
)

from minos.common import (
    DeclarativeModel,
)

from .abc import (
    BrokerMessage,
)

logger = logging.getLogger(__name__)


[docs]@total_ordering class BrokerMessageV1(BrokerMessage, DeclarativeModel): """Broker Message V1 class.""" topic: str identifier: UUID reply_topic: Optional[str] strategy: BrokerMessageV1Strategy # FIXME: Remove this attribute! payload: BrokerMessageV1Payload
[docs] def __init__( self, topic: str, payload: BrokerMessageV1Payload, *, identifier: Optional[UUID] = None, strategy: Optional[BrokerMessageV1Strategy] = None, **kwargs, ): if identifier is None: identifier = uuid4() if strategy is None: strategy = BrokerMessageV1Strategy.UNICAST super().__init__(topic=topic, identifier=identifier, strategy=strategy, payload=payload, **kwargs)
# noinspection PyPropertyDefinition @classmethod @property def version(cls) -> int: """Get the version of the message. :return: A strictly positive ``int`` value. """ return 1 @property def topic(self) -> str: """Get the topic of the message. :return: A ``str`` value. """ return self.fields["topic"].value @property def identifier(self) -> UUID: """Get the identifier of the message. :return: An ``UUID`` instance. """ return self.fields["identifier"].value @property def reply_topic(self) -> Optional[str]: """Get the reply topic of the message if there is someone. :return: A ``str`` value or ``None``. """ return self.fields["reply_topic"].value
[docs] def set_reply_topic(self, value: Optional[str]) -> None: """Set the message's reply topic. :param value: A ``str`` value or ``None``. :return: This method does not return anything. """ self.fields["reply_topic"].value = value
@property def content(self) -> Any: """Get the content of the message. :return: Any value. """ return self.payload.content @property def ok(self) -> bool: """Check if the message is okay or not. :return: ``True`` if the message is okay or ``False`` otherwise. """ return self.payload.ok @property def status(self) -> int: """Get the payload status. :return: A ``BrokerMessageV1Status`` instance. """ return self.payload.status @property def headers(self) -> dict[str, str]: """Get the payload headers. :return: A `dict` with `str` keys and `str` values. """ return self.payload.headers @property def data(self) -> Any: """Get the payload content. :return: Any value. """ warnings.warn("The `BrokerMessage.data` attribute has been deprecated", DeprecationWarning) return self.payload.content def __lt__(self, other: Any) -> bool: # noinspection PyBroadException return isinstance(other, type(self)) and self.payload < other.payload
[docs]@total_ordering class BrokerMessageV1Payload(DeclarativeModel): """Broker Message Payload class.""" content: Any status: BrokerMessageV1Status headers: dict[str, str]
[docs] def __init__( self, content: Any = None, headers: Optional[dict[str, str]] = None, status: Optional[int] = None, **kwargs ): if headers is None: headers = dict() if status is None: status = BrokerMessageV1Status.SUCCESS super().__init__(content=content, status=status, headers=headers, **kwargs)
@property def ok(self) -> bool: """Check if the message is okay or not. :return: ``True`` if the message is okay or ``False`` otherwise. """ return self.status == BrokerMessageV1Status.SUCCESS @property def data(self) -> Any: """Get the content. :return: Any value. """ warnings.warn("The `BrokerMessage.data` attribute has been deprecated", DeprecationWarning) return self.content def __lt__(self, other: Any) -> bool: # noinspection PyBroadException try: return isinstance(other, type(self)) and self.content < other.content except Exception: return False
[docs]class BrokerMessageV1Status(int, Enum): """Broker Message Status v1 class.""" SUCCESS = 200 ERROR = 400 SYSTEM_ERROR = 500 UNKNOWN = 600 @classmethod def _missing_(cls, value: Any) -> BrokerMessageV1Status: logger.warning(f"The given status value is unknown: {value}") return cls.UNKNOWN
[docs]class BrokerMessageV1Strategy(str, Enum): """Broker Message Strategy class""" UNICAST = "unicast" MULTICAST = "multicast"