Source code for minos.common.protocol.avro.messages

import logging
from typing import (
    Any,
)

from .base import (
    MinosAvroProtocol,
)

logger = logging.getLogger(__name__)


[docs]class MinosAvroMessageProtocol(MinosAvroProtocol): """Minos Avro Messages Protocol class."""
[docs] @classmethod def encode(cls, headers: dict, body: Any = None, *args, **kwargs) -> bytes: """ encoder in avro all the headers are converted in fields with double underscore name the body is a set fields coming from the data type. """ # prepare the headers final_data = dict() final_data["headers"] = headers if body: final_data["body"] = body return super().encode(final_data, _AVRO_SCHEMA)
[docs] @classmethod def decode(cls, data: bytes, *args, **kwargs) -> dict: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. :param data: A bytes object. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A dictionary or a list of dictionaries. """ data_return = {"headers": dict()} for schema_dict in super().decode(data, batch_mode=True): logger.debug("Avro: get the request/response in dict format") data_return["headers"] = schema_dict["headers"] # check wich type is body if "body" in schema_dict: if isinstance(schema_dict["body"], dict): data_return["body"] = {} elif isinstance(schema_dict["body"], list): data_return["body"] = [] data_return["body"] = schema_dict["body"] return data_return
_AVRO_SCHEMA = { "type": "record", "namespace": "org.minos.protocol", "name": "message", "fields": [ {"name": "headers", "type": {"type": "map", "values": ["string", "int", "bytes", "long", "float", "boolean"]}}, { "name": "body", "type": [ "null", "string", "int", "bytes", "boolean", "float", "long", {"type": "array", "items": ["int", "string", "bytes", "long", "boolean", "float"]}, { "type": "map", "values": [ "null", "int", "string", "bytes", "boolean", "float", "long", { "type": "array", "items": [ "string", "int", "bytes", "long", "boolean", "float", {"type": "map", "values": ["string", "int", "bytes", "long", "boolean", "float"]}, ], }, { "type": "map", "values": [ "string", "int", "bytes", "long", "boolean", "float", {"type": "map", "values": ["string", "int", "bytes", "long", "boolean", "float"]}, ], }, ], }, ], }, ], }