from __future__ import (
annotations,
)
import logging
from collections import (
defaultdict,
)
from itertools import (
chain,
)
from typing import (
Any,
Generic,
Iterable,
Iterator,
Type,
TypeVar,
Union,
)
from uuid import (
uuid4,
)
from minos.common import (
BucketModel,
Field,
Model,
ModelType,
)
from ..actions import (
Action,
)
from ..collections import (
IncrementalSet,
)
logger = logging.getLogger(__name__)
T = TypeVar("T")
[docs]class FieldDiff(Model, Generic[T]):
"""Field Diff class."""
name: str
value: T
[docs] def __init__(self, name: str, type_: Type, value: Any, **kwargs):
super().__init__([self._field_cls("name", str, name), self._field_cls("value", type_, value)], **kwargs)
[docs] @classmethod
def from_model_type(cls, model_type: ModelType, *args, **kwargs) -> FieldDiff:
"""Build a new instance from model type.
:param model_type: The model type.
:param args: Additional positional. arguments.
:param kwargs: Additional named arguments.
:return: A new ``FieldDiff`` instance.
"""
kwargs["type_"] = model_type.type_hints["value"]
return cls(*args, **kwargs)
[docs]class IncrementalFieldDiff(FieldDiff, Generic[T]):
"""Incremental Field Diff class."""
action: Action
[docs] def __init__(self, name: str, type_: Type, value: Any, action: Action, **kwargs):
Model.__init__(
self,
[
self._field_cls("name", str, name),
self._field_cls("value", type_, value),
self._field_cls("action", Action, action),
],
**kwargs,
)
[docs]class FieldDiffContainer(BucketModel):
"""Field Diff Container class."""
[docs] def __init__(
self, diffs: Iterable[FieldDiff] = None, fields: Union[Iterable[Field], dict[str, Field]] = None, **kwargs
):
if diffs is not None:
fields = map(lambda v: self._field_cls(self.generate_random_str(), FieldDiff, v), diffs)
super().__init__(fields, **kwargs)
self._mapper = _build_mapper(self._fields)
def __getitem__(self, item: str) -> Any:
try:
return super().__getitem__(item)
except KeyError as exc:
try:
return self.get_one(item)
except Exception:
raise exc
def __eq__(self, other):
return type(self) == type(other) and tuple(self.values()) == tuple(other.values())
def __repr__(self) -> str:
fields_repr = ", ".join(f"{k}={v}" for k, v in self.items())
return f"{type(self).__name__}({fields_repr})"
def __iter__(self) -> Iterable[str]:
"""Get the field names.
:return: An iterable of string values.
"""
yield from self._mapper.keys()
[docs] def get_one(self, name: str, return_diff: bool = True) -> Union[FieldDiff, Any, list[FieldDiff], list[Any]]:
"""Get first field diff with given name.
:param name: The name of the field diff.
:param return_diff: If ``True`` the result is returned as field diff instances, otherwise the result is
returned as value instances.
:return: A ``FieldDiff`` instance.
"""
type_, names = self._mapper[name]
if not issubclass(type_, IncrementalFieldDiff):
return self._get_one(names[0], return_diff)
return [self._get_one(name, return_diff) for name in names]
def _get_one(self, name: str, return_diff: bool) -> Union[FieldDiff, Any]:
diff = getattr(self, name)
if return_diff:
return diff
return diff.value
[docs] def get_all(self, return_diff: bool = True) -> dict[str, Union[FieldDiff, Any, list[FieldDiff], list[Any]]]:
"""Get a dictionary containing all names as keys and all the values of each one as values.
:param return_diff: If ``True`` the result is returned as field diff instances, otherwise the result is
returned as value instances.
:return: A ``dict`` with ``str`` keys and ``list[Any]`` values.
"""
return {key: self.get_one(key, return_diff) for key in self.keys()}
[docs] def flatten_items(self) -> Iterator[tuple[str, FieldDiff]]:
"""Get the field differences in a flatten way.
:return: An ``Iterator`` of ``tuple[str, FieldDiff]`` instances.
"""
return map(lambda diff: (diff.name, diff), self.flatten_values())
[docs] def flatten_values(self) -> Iterator[FieldDiff]:
"""Get the field differences in a flatten way.
:return: An ``Iterator`` of ``FieldDiff`` instances.
"""
iterable = (v if isinstance(v, list) else [v] for v in self.values())
return chain.from_iterable(iterable)
[docs] @classmethod
def from_difference(cls, a: Model, b: Model, ignore: set[str] = frozenset()) -> FieldDiffContainer:
"""Build a new instance from the difference between two models.
:param a: Latest model instance.
:param b: Oldest model instance.
:param ignore: Set of fields to be ignored.
:return: A new ``FieldDiffContainer`` instance.
"""
logger.debug(f"Computing the {cls!r} between {a!r} and {b!r}...")
differences = cls._diff(a.fields, b.fields)
differences = [difference for difference in differences if difference.name not in ignore]
return cls(differences)
@staticmethod
def _diff(a: dict[str, Field], b: dict[str, Field]) -> list[FieldDiff]:
differences = list()
for a_name, a_field in a.items():
if a_name not in b or a_field != b[a_name]:
if isinstance(a_field.value, IncrementalSet):
diffs = a_field.value.diff(b[a_name].value).diffs
for diff in diffs:
differences.append(
IncrementalFieldDiff(a_name, a_field.value.data_cls, diff.entity, diff.action)
)
else:
differences.append(FieldDiff(a_name, a_field.type, a_field.value))
return differences
[docs] @classmethod
def from_model(cls, model: Model, ignore: set[str] = frozenset()) -> FieldDiffContainer:
"""Build a new difference from a single model.
:param model: The model instance.
:param ignore: Set of fields to be ignored.
:return: A new ``FieldDiffContainer`` instance.
"""
differences = list()
for field in model.fields.values():
differences.append(FieldDiff(field.name, field.type, field.value))
differences = [difference for difference in differences if difference.name not in ignore]
return cls(differences)
[docs] @staticmethod
def generate_random_str() -> str:
"""Generate a random string
:return: A random string value.
"""
return str(uuid4())
def _build_mapper(fields: dict[str, Field]) -> dict[str, tuple[type, list[str]]]:
mapper = defaultdict(list)
for name, field in fields.items():
mapper[field.value.name].append(name)
mapper = dict(mapper)
ans = dict()
for k, names in mapper.items():
types = {type(fields[name].value) for name in names}
if len(types) > 1:
raise ValueError(f"Multiple {FieldDiff.__name__!r} types have been provided to {k!r}: {types}")
type_ = next(iter(types))
if len(names) > 1 and not issubclass(type_, IncrementalFieldDiff):
raise ValueError(f"Only {IncrementalFieldDiff.__name__!r} type allow multiple {k!r} values.")
ans[k] = (type_, names)
return ans