Source code for minos.common.protocol.avro.base

import io
from typing import (
    Any,
    Union,
)

from fastavro import (
    parse_schema,
    reader,
    writer,
)

from ...exceptions import (
    MinosProtocolException,
)
from ..abc import (
    MinosBinaryProtocol,
)


[docs]class MinosAvroProtocol(MinosBinaryProtocol): """Minos Avro Protocol class."""
[docs] @classmethod def encode(cls, value: Any, schema: Any, *args, batch_mode: bool = False, **kwargs) -> bytes: """Encoder in avro for database Values all the headers are converted in fields with double underscore name the body is a set fields coming from the data type. :param value: The data to be stored. :param schema: The schema relative to the data. :param args: Additional positional arguments. :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a single model. :param kwargs: Additional named arguments. :return: A bytes object. """ if not batch_mode: value = [value] if not isinstance(schema, list): schema = [schema] try: raw_schema = cls._parse_schema(schema) return cls._write_data(value, raw_schema) except Exception as exc: raise MinosProtocolException(f"Error encoding data: {exc!r}")
@staticmethod def _parse_schema(schema: list[dict[str, Any]]) -> dict[str, Any]: named_schemas = {} for item in schema[1::-1]: parse_schema(item, named_schemas) return parse_schema(schema[0], named_schemas, expand=True) @staticmethod def _write_data(value: list[dict[str, Any]], schema: dict[str, Any]): with io.BytesIO() as file: writer(file, schema, value) file.seek(0) content = file.getvalue() return content
[docs] @classmethod def decode(cls, data: bytes, *args, batch_mode: bool = False, **kwargs) -> Any: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. :param data: A bytes object. :param args: Additional positional arguments. :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a single model. :param kwargs: Additional named arguments. :return: A dictionary or a list of dictionaries. """ try: with io.BytesIO(data) as file: ans = list(reader(file)) except Exception as exc: raise MinosProtocolException(f"Error decoding the avro bytes: {exc}") if not batch_mode: if len(ans) > 1: raise MinosProtocolException( f"The 'batch_mode' argument was set to {False!r} but data contains multiple values: {ans!r}" ) ans = ans[0] return ans
# noinspection PyUnusedLocal
[docs] @classmethod def decode_schema(cls, data: bytes, *args, **kwargs) -> Union[dict[str, Any], list[dict[str, Any]]]: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. :param data: A bytes object. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A tuple or a list of tuples. """ try: with io.BytesIO(data) as file: r = reader(file) schema = r.writer_schema except Exception as exc: raise MinosProtocolException(f"Error getting avro schema: {exc}") return schema