Source code for minos.aggregate.snapshots.services

from __future__ import (
    annotations,
)

import logging
from asyncio import (
    gather,
)
from typing import (
    TYPE_CHECKING,
)
from uuid import (
    UUID,
)

from cached_property import (
    cached_property,
)

from minos.common import (
    Config,
    Inject,
    ModelType,
)
from minos.networks import (
    EnrouteDecorator,
    Request,
    Response,
    ResponseException,
    enroute,
)

from .repositories import (
    SnapshotRepository,
)

if TYPE_CHECKING:
    from ..entities import (
        RootEntity,
    )

logger = logging.getLogger(__name__)


[docs]class SnapshotService: """Snapshot Service class.""" # noinspection PyUnusedLocal
[docs] @Inject() def __init__( self, *args, config: Config, snapshot_repository: SnapshotRepository, **kwargs, ): self.config = config self.snapshot_repository = snapshot_repository
@classmethod def __get_enroute__(cls, config: Config) -> dict[str, set[EnrouteDecorator]]: from ..entities import ( RefResolver, ) aggregate_config = config.get_aggregate() root_entity = aggregate_config["entities"][0] name = root_entity.__name__ return { cls.__get_many__.__name__: {enroute.broker.command(RefResolver.build_topic_name(name))}, cls.__synchronize__.__name__: {enroute.periodic.event("* * * * *")}, } async def __get_many__(self, request: Request) -> Response: """Get many ``RootEntity`` instances. :param request: The ``Request`` instance that contains the instance identifiers. :return: A ``Response`` instance containing the requested instances. """ try: content = await request.content(model_type=ModelType.build("Query", {"uuids": list[UUID]})) except Exception as exc: raise ResponseException(f"There was a problem while parsing the given request: {exc!r}") try: instances = await gather(*(self.type_.get(uuid) for uuid in content["uuids"])) except Exception as exc: raise ResponseException(f"There was a problem while getting the instances: {exc!r}") return Response(instances) @cached_property def type_(self) -> type[RootEntity]: """Load the concrete ``RootEntity`` class. :return: A ``Type`` object. """ aggregate_config = self.config.get_aggregate() return aggregate_config["entities"][0] # noinspection PyUnusedLocal async def __synchronize__(self, request: Request) -> None: """Performs a Snapshot synchronization every minute. :param request: A request containing information related with scheduling. :return: This method does not return anything. """ logger.info("Performing periodic Snapshot synchronization...") await self.snapshot_repository.synchronize()