Source code for minos.common.config.v1

from __future__ import (
    annotations,
)

from contextlib import (
    suppress,
)
from typing import (
    TYPE_CHECKING,
    Any,
)

from ..exceptions import (
    MinosConfigException,
)
from ..importlib import (
    import_module,
)
from .abc import (
    Config,
)

if TYPE_CHECKING:
    from ..injections import (
        InjectableMixin,
    )


[docs]class ConfigV1(Config): """ A Minos configuration provides information on the connection points available at that service. It consists of the following parts: - Service meta-information (such as name, or version). - REST Service endpoints available. - Repository database connection for event sourcing. - Snapshot database connection. - Events it publishes/consumes from de given Kafka service. - Commands it reacts to from other microservices. - Sagas it takes part on. """ _ENVIRONMENT_MAPPER = { "service.name": "MINOS_SERVICE_NAME", "rest.host": "MINOS_REST_HOST", "rest.port": "MINOS_REST_PORT", "broker.host": "MINOS_BROKER_HOST", "broker.port": "MINOS_BROKER_PORT", "broker.queue.host": "MINOS_BROKER_QUEUE_HOST", "broker.queue.port": "MINOS_BROKER_QUEUE_PORT", "broker.queue.database": "MINOS_BROKER_QUEUE_DATABASE", "broker.queue.user": "MINOS_BROKER_QUEUE_USER", "broker.queue.password": "MINOS_BROKER_QUEUE_PASSWORD", "commands.service": "MINOS_COMMANDS_SERVICE", "queries.service": "MINOS_QUERIES_SERVICE", "repository.host": "MINOS_REPOSITORY_HOST", "repository.port": "MINOS_REPOSITORY_PORT", "repository.database": "MINOS_REPOSITORY_DATABASE", "repository.user": "MINOS_REPOSITORY_USER", "repository.password": "MINOS_REPOSITORY_PASSWORD", "snapshot.host": "MINOS_SNAPSHOT_HOST", "snapshot.port": "MINOS_SNAPSHOT_PORT", "snapshot.database": "MINOS_SNAPSHOT_DATABASE", "snapshot.user": "MINOS_SNAPSHOT_USER", "snapshot.password": "MINOS_SNAPSHOT_PASSWORD", "discovery.client": "MINOS_DISCOVERY_CLIENT", "discovery.host": "MINOS_DISCOVERY_HOST", "discovery.port": "MINOS_DISCOVERY_PORT", } _PARAMETERIZED_MAPPER = { "service.name": "service_name", "service.injections": "service_injections", "rest.host": "rest_host", "rest.port": "rest_port", "broker.host": "broker_host", "broker.port": "broker_port", "broker.queue.host": "broker_queue_host", "broker.queue.port": "broker_queue_port", "broker.queue.database": "broker_queue_database", "broker.queue.user": "broker_queue_user", "broker.queue.password": "broker_queue_password", "commands.service": "commands_service", "queries.service": "queries_service", "saga.broker": "saga_broker", "saga.port": "saga_port", "repository.host": "repository_host", "repository.port": "repository_port", "repository.database": "repository_database", "repository.user": "repository_user", "repository.password": "repository_password", "snapshot.host": "snapshot_host", "snapshot.port": "snapshot_port", "snapshot.database": "snapshot_database", "snapshot.user": "snapshot_user", "snapshot.password": "snapshot_password", "discovery.client": "minos_discovery_client", "discovery.host": "minos_discovery_host", "discovery.port": "minos_discovery_port", } @property def _version(self) -> int: return 1 def _get_name(self) -> str: return self.get_by_key("service.name") def _get_aggregate(self) -> dict[str, Any]: return { "entities": [self.get_type_by_key("service.aggregate")], "repositories": dict(), } def _get_saga(self) -> dict[str, Any]: try: saga = self.get_by_key("saga") except MinosConfigException: saga = dict() saga.pop("storage", None) return saga def _get_injections(self) -> list[type[InjectableMixin]]: from ..pools import ( Pool, PoolFactory, ) injections = self._get_raw_injections() injections = [ injection for injection in injections if not (issubclass(injection, Pool) or issubclass(injection, PoolFactory)) ] with suppress(MinosConfigException): pool_factory = self._get_pools().get("factory") if pool_factory is not None: # noinspection PyTypeChecker injections.insert(0, pool_factory) # noinspection PyTypeChecker return injections def _get_raw_injections(self) -> list[type[InjectableMixin]]: try: injections = self.get_by_key("service.injections") if isinstance(injections, dict): injections = list(injections.values()) except MinosConfigException: injections = list() injections = [import_module(classname) for classname in injections] # noinspection PyTypeChecker return injections def _get_interfaces(self) -> dict[str, dict[str, Any]]: interfaces = dict() with suppress(MinosConfigException): interfaces["http"] = self._get_interface_http() with suppress(MinosConfigException): interfaces["broker"] = self._get_interface_broker() with suppress(MinosConfigException): interfaces["periodic"] = self._get_interface_periodic() return interfaces def _get_interface_http(self) -> dict[str, Any]: try: port = next( port for port in self.get_by_key("service.services") if ("http" in port.lower() or "rest" in port.lower()) ) except Exception as exc: raise MinosConfigException(f"The 'http' interface is not available: {exc!r}") try: connector = self.get_by_key("rest") except MinosConfigException: connector = dict() return { "port": import_module(port), "connector": connector, } def _get_interface_broker(self) -> dict[str, Any]: try: port = next(port for port in self.get_by_key("service.services") if "broker" in port.lower()) except Exception as exc: raise MinosConfigException(f"The 'broker' interface is not available: {exc!r}") try: common = self.get_by_key("broker") except MinosConfigException: common = dict() try: common["queue"] = self.get_by_key("broker.queue") common["queue"].pop("database", None) common["queue"].pop("port", None) common["queue"].pop("host", None) common["queue"].pop("port", None) common["queue"].pop("user", None) common["queue"].pop("password", None) except MinosConfigException: common["queue"] = dict() return { "port": import_module(port), "publisher": dict(), "subscriber": dict(), "common": common, } def _get_interface_periodic(self): try: port = next(port for port in self.get_by_key("service.services") if "periodic" in port.lower()) except Exception as exc: raise MinosConfigException(f"The 'periodic' interface is not available: {exc!r}") return { "port": import_module(port), } def _get_services(self) -> list[type]: try: services = self.get_by_key("services") except MinosConfigException: services = list() services = [import_module(classname) for classname in services] return services def _get_pools(self) -> dict[str, type]: from ..pools import ( Pool, PoolFactory, ) factory = next( (injection for injection in self._get_raw_injections() if issubclass(injection, PoolFactory)), PoolFactory ) injections = [injection for injection in self._get_raw_injections() if issubclass(injection, Pool)] if not len(injections): return dict() types = dict() for injection in injections: if "lock" in injection.__name__.lower(): types["lock"] = injection elif "database" in injection.__name__.lower(): types["database"] = injection elif "broker" in injection.__name__.lower(): types["broker"] = injection return { "factory": factory, "types": types, } def _get_routers(self) -> list[type]: try: routers = self.get_by_key("routers") except MinosConfigException: routers = list() routers = [import_module(classname) for classname in routers] return routers def _get_middleware(self) -> list[type]: try: middleware = self.get_by_key("middleware") except MinosConfigException: middleware = list() middleware = [import_module(classname) for classname in middleware] return middleware def _get_databases(self) -> dict[str, dict[str, Any]]: databases = dict() with suppress(MinosConfigException): databases["broker"] = self._get_database_broker() with suppress(MinosConfigException): databases["event"] = self._get_database_event() with suppress(MinosConfigException): databases["snapshot"] = self._get_database_snapshot() with suppress(MinosConfigException): databases["saga"] = self._get_database_saga() with suppress(MinosConfigException): databases["query"] = self._get_database_query() with suppress(MinosConfigException): databases["default"] = self._get_database_event() return databases def _get_database_broker(self): return self._get_database_by_name("broker.queue") def _get_database_saga(self) -> dict[str, Any]: raw = self._get_database_by_name("saga.storage") return raw def _get_database_event(self) -> dict[str, Any]: return self._get_database_by_name("repository") def _get_database_query(self) -> dict[str, Any]: return self._get_database_by_name("query_repository") def _get_database_snapshot(self) -> dict[str, Any]: return self._get_database_by_name("snapshot") def _get_database_by_name(self, prefix: str): data = self.get_by_key(prefix) data.pop("records", None) data.pop("retry", None) if "client" in data: data["client"] = import_module(data["client"]) return data def _get_discovery(self) -> dict[str, Any]: data = self.get_by_key("discovery") data["client"] = self.get_type_by_key("discovery.client") return data def _to_parameterized_variable(self, key: str) -> str: return self._PARAMETERIZED_MAPPER[key] def _to_environment_variable(self, key: str) -> str: return self._ENVIRONMENT_MAPPER[key]