Source code for minos.aggregate.entities.refs.extractors

from collections import (
    defaultdict,
)
from operator import (
    attrgetter,
)
from typing import (
    Any,
    Iterable,
    Optional,
    Union,
    get_args,
    get_origin,
)
from uuid import (
    UUID,
)

from minos.common import (
    TypeHintBuilder,
    is_model_type,
)

from .models import (
    Ref,
)


[docs]class RefExtractor: """Model Reference Extractor class."""
[docs] def __init__(self, value: Any, type_: Optional[type] = None, as_uuids: bool = True): if type_ is None: type_ = TypeHintBuilder(value).build() self.value = value self.type_ = type_ self.as_uuids = as_uuids
[docs] def build(self) -> dict[str, set[UUID]]: """Run the model reference extractor. :return: A dictionary in which the keys are the class names and the values are the identifiers. """ ans = defaultdict(set) self._build(self.value, self.type_, ans) if self.as_uuids: ans = {k: set(map(attrgetter("uuid"), v)) for k, v in ans.items()} return ans
def _build(self, value: Any, type_: type, ans: dict[str, set[Ref]]) -> None: if get_origin(type_) is Union: type_ = next((t for t in get_args(type_) if get_origin(t) is Ref), type_) if isinstance(value, (tuple, list, set)): self._build_iterable(value, get_args(type_)[0], ans) elif isinstance(value, dict): self._build_iterable(value.keys(), get_args(type_)[0], ans) self._build_iterable(value.values(), get_args(type_)[1], ans) elif isinstance(value, Ref): cls = value.data_cls if cls is None and len(args := get_args(type_)): cls = args[0] if cls is None and len(args := get_args(type_.type_hints["data"])): cls = args[0] name = cls.__name__ ans[name].add(value) elif is_model_type(value): # noinspection PyUnresolvedReferences for field in value.fields.values(): self._build(field.value, field.type, ans) def _build_iterable(self, value: Iterable, value_: type, ans: dict[str, set[Ref]]) -> None: for sub_value in value: self._build(sub_value, value_, ans)