Source code for minos.common.model.abc

from __future__ import (
    annotations,
)

import logging
from abc import (
    abstractmethod,
)
from base64 import (
    b64decode,
    b64encode,
)
from collections.abc import (
    Mapping,
)
from typing import (
    Any,
    Iterable,
    Iterator,
    Type,
    TypedDict,
    TypeVar,
    Union,
)

from ..exceptions import (
    EmptyMinosModelSequenceException,
    MultiTypeMinosModelSequenceException,
)
from ..importlib import (
    classname,
)
from ..meta import (
    classproperty,
    property_or_classproperty,
    self_or_classmethod,
)
from ..protocol import (
    MinosAvroProtocol,
)
from .fields import (
    Field,
)
from .serializers import (
    AvroDataDecoder,
    AvroDataEncoder,
    AvroSchemaDecoder,
    AvroSchemaEncoder,
    DataDecoder,
    DataEncoder,
    SchemaDecoder,
    SchemaEncoder,
)
from .types import (
    MissingSentinel,
    ModelType,
)

logger = logging.getLogger(__name__)


[docs]class Model(Mapping): """Base class for ``minos`` model entities.""" _field_cls: Type[Field] = Field _fields: dict[str, Field] __eq_reversing: bool
[docs] def __init__(self, fields: Union[Iterable[Field], dict[str, Field]] = None, **kwargs): """Class constructor. :param fields: Dictionary that contains the ``Field`` instances of the model indexed by name. """ if fields is None: fields = dict() if not isinstance(fields, dict): fields = {field.name: field for field in fields} self._fields = fields self.__eq_reversing = False
[docs] @classmethod def from_typed_dict(cls: Type[T], typed_dict: TypedDict, *args, **kwargs) -> T: """Build a ``Model`` from a ``TypeDict`` and ``data``. :param typed_dict: ``TypeDict`` object containing the DTO's structure :param args: Positional arguments to be passed to the model constructor. :param kwargs: Named arguments to be passed to the model constructor. :return: A new ``DataTransferObject`` instance. """ return cls.from_model_type(ModelType.from_typed_dict(typed_dict), *args, **kwargs)
[docs] @classmethod @abstractmethod def from_model_type(cls: Type[T], model_type: ModelType, *args, **kwargs) -> T: """Build a ``Model`` from a ``ModelType``. :param model_type: ``ModelType`` object containing the DTO's structure :param args: Positional arguments to be passed to the model constructor. :param kwargs: Named arguments to be passed to the model constructor. :return: A new ``DeclarativeModel`` instance. """
[docs] @classmethod def from_avro_str(cls: Type[T], raw: str, **kwargs) -> Union[T, list[T]]: """Build a single instance or a sequence of instances from bytes :param raw: A ``str`` representation of the model. :return: A single instance or a sequence of instances. """ raw = b64decode(raw.encode()) return cls.from_avro_bytes(raw, **kwargs)
# noinspection PyUnusedLocal
[docs] @classmethod def from_avro_bytes(cls: Type[T], raw: bytes, batch_mode: bool = False, **kwargs) -> Union[T, list[T]]: """Build a single instance or a sequence of instances from bytes. :param raw: A ``bytes`` representation of the model. :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a single model. :param kwargs: Additional named arguments. :return: A single instance or a sequence of instances. """ schema = MinosAvroProtocol.decode_schema(raw) data = MinosAvroProtocol.decode(raw, batch_mode=batch_mode) if batch_mode: return [cls.from_avro(schema, entry) for entry in data] return cls.from_avro(schema, data)
[docs] @classmethod def from_avro(cls: Type[T], schema: Any, data: Any) -> T: """Build a new instance from the ``avro`` schema and data. :param schema: The avro schema of the model. :param data: The avro data of the model. :return: A new ``DynamicModel`` instance. """ schema_decoder = AvroSchemaDecoder() type_ = schema_decoder.build(schema) data_decoder = AvroDataDecoder() instance = data_decoder.build(data, type_) return instance
[docs] @classmethod def to_avro_str(cls: Type[T], models: list[T]) -> str: """Build the avro string representation of the given object instances. :param models: A sequence of minos models. :return: A bytes object. """ return b64encode(cls.to_avro_bytes(models)).decode()
[docs] @classmethod def to_avro_bytes(cls: Type[T], models: list[T]) -> bytes: """Create a ``bytes`` representation of the given object instances. :param models: A sequence of minos models. :return: A bytes object. """ if len(models) == 0: raise EmptyMinosModelSequenceException("'models' parameter cannot be empty.") model_type = type(models[0]) if not all(model_type == type(model) for model in models): raise MultiTypeMinosModelSequenceException( f"Every model must have type {model_type} to be valid. Found types: {[type(model) for model in models]}" ) avro_schema = models[0].avro_schema # noinspection PyTypeChecker return MinosAvroProtocol.encode([model.avro_data for model in models], avro_schema, batch_mode=True)
# noinspection PyMethodParameters @property_or_classproperty def model_type(self_or_cls) -> ModelType: """Get the model type of the instance. :return: A ``ModelType`` instance. """ # noinspection PyTypeChecker return ModelType.from_model(self_or_cls) # noinspection PyMethodParameters @classproperty def classname(cls) -> str: """Compute the current class namespace. :return: An string object. """ # noinspection PyTypeChecker return classname(cls) # noinspection PyMethodParameters @property_or_classproperty def type_hints(self_or_cls) -> dict[str, type]: """Get the type hinting of the instance or class. :return: A dictionary in which the keys are the field names and the values are the types. """ return dict(self_or_cls._type_hints()) # noinspection PyMethodParameters @self_or_classmethod def _type_hints(self_or_cls) -> Iterator[tuple[str, Any]]: if not isinstance(self_or_cls, type): yield from ((field.name, field.real_type) for field in self_or_cls.fields.values()) # noinspection PyMethodParameters @property_or_classproperty def type_hints_parameters(self_or_cls) -> tuple[TypeVar, ...]: """Get the sequence of generic type hints parameters.. :return: A tuple of `TypeVar` instances. """ return getattr(self_or_cls, "__parameters__", tuple()) @property def fields(self) -> dict[str, Field]: """Fields getter""" return self._fields def __setitem__(self, key: str, value: Any) -> None: try: self._fields[key].value = value except KeyError: raise KeyError(f"{type(self).__name__!r} does not contain the {key!r} field") def __getitem__(self, item: str) -> Any: try: return self._fields[item].value except KeyError: raise KeyError(f"{type(self).__name__!r} does not contain the {item!r} field.") def __setattr__(self, key: str, value: Any) -> None: if key.startswith("_"): object.__setattr__(self, key, value) return if key not in self._fields: raise AttributeError(f"{type(self).__name__!r} does not contain the {key!r} attribute.") self[key] = value def __getattr__(self, item: str) -> Any: if item.startswith("_") or item not in self._fields: raise AttributeError(f"{type(self).__name__!r} does not contain the {item!r} attribute.") return self[item] # noinspection PyMethodParameters @property_or_classproperty def avro_schema(self_or_cls) -> list[dict[str, Any]]: """Compute the avro schema of the model. :return: A dictionary object. """ # noinspection PyTypeChecker encoder = AvroSchemaEncoder() return encoder.build(self_or_cls) @property def avro_data(self) -> dict[str, Any]: """Compute the avro data of the model. :return: A dictionary object. """ encoder = AvroDataEncoder() return encoder.build(self) @property def avro_str(self) -> str: """Generate bytes representation of the current instance. :return: A bytes object. """ # noinspection PyTypeChecker return b64encode(self.avro_bytes).decode() @property def avro_bytes(self) -> bytes: """Generate bytes representation of the current instance. :return: A bytes object. """ # noinspection PyTypeChecker return MinosAvroProtocol.encode(self.avro_data, self.avro_schema) # noinspection PyUnusedLocal
[docs] @staticmethod def encode_schema(encoder: SchemaEncoder, target: Any, **kwargs) -> Any: """Encode schema with the given encoder. :param encoder: The encoder instance. :param target: An optional pre-encoded schema. :param kwargs: Additional named arguments. :return: The encoded schema of the instance. """ return MissingSentinel
# noinspection PyUnusedLocal
[docs] @staticmethod def decode_schema(decoder: SchemaDecoder, target: Any, **kwargs) -> Any: """Decode schema with the given encoder. :param decoder: The decoder instance. :param target: The schema to be decoded. :param kwargs: Additional named arguments. :return: The decoded schema as a type. """ return MissingSentinel
# noinspection PyUnusedLocal
[docs] @staticmethod def encode_data(encoder: DataEncoder, target: Any, **kwargs) -> Any: """Encode data with the given encoder. :param encoder: The encoder instance. :param target: An optional pre-encoded data. :param kwargs: Additional named arguments. :return: The encoded data of the instance. """ return MissingSentinel
# noinspection PyUnusedLocal
[docs] @staticmethod def decode_data(decoder: DataDecoder, target: Any, type_: ModelType, **kwargs) -> Any: """Decode data with the given decoder. :param decoder: The decoder instance. :param target: The data to be decoded. :param type_: The data type. :param kwargs: Additional named arguments. :return: A decoded instance. """ return MissingSentinel
def __eq__(self: T, other: T) -> bool: if type(self) == type(other) and self.fields == other.fields: return True if not self.__eq_reversing: try: self.__eq_reversing = True return other == self finally: self.__eq_reversing = False return False def __hash__(self) -> int: return hash(tuple(self.fields.values())) def __iter__(self) -> Iterable[str]: yield from self.fields.keys() def __len__(self): return len(self.fields) def __repr__(self) -> str: fields_repr = ", ".join(repr(field) for field in self.fields.values()) return f"{type(self).__name__}({fields_repr})"
T = TypeVar("T", bound=Model)