Source code for minos.common.launchers

from __future__ import (
    annotations,
)

import logging
import warnings
from asyncio import (
    AbstractEventLoop,
    gather,
)
from enum import (
    Enum,
)
from types import (
    ModuleType,
)
from typing import (
    NoReturn,
    Optional,
    Union,
)

from aiomisc.entrypoint import (
    Entrypoint,
)
from aiomisc.log import (
    LogFormat,
    basic_config,
)
from aiomisc.utils import (
    create_default_event_loop,
)
from aiomisc_log.enum import (
    DateFormat,
)
from cached_property import (
    cached_property,
)

from .config import (
    Config,
)
from .importlib import (
    get_internal_modules,
    import_module,
)
from .injections import (
    DependencyInjector,
    InjectableMixin,
)
from .ports import (
    Port,
)
from .setup import (
    SetupMixin,
)

logger = logging.getLogger(__name__)


def _create_entrypoint(*args, **kwargs) -> Entrypoint:  # pragma: no cover
    return Entrypoint(*args, **kwargs)


def _create_loop() -> AbstractEventLoop:  # pragma: no cover
    return create_default_event_loop()[0]


[docs]class EntrypointLauncher(SetupMixin): """EntryPoint Launcher class."""
[docs] def __init__( self, config: Config, injections: list[Union[SetupMixin, type[SetupMixin], str]], ports: list[Union[Port, type[Port], str]], log_level: Union[int, str] = logging.INFO, log_format: Union[str, LogFormat] = "color", log_date_format: Union[str, DateFormat] = DateFormat["color"], external_modules: Optional[list[ModuleType]] = None, external_packages: Optional[list[str]] = None, *args, **kwargs, ): if external_modules is None: external_modules = list() if external_packages is None: external_packages = list() super().__init__(*args, **kwargs) if isinstance(log_date_format, Enum): log_date_format = log_date_format.value self.config = config self._log_level = log_level self._log_format = log_format self._log_date_format = log_date_format self._raw_injections = injections self._raw_ports = ports self._external_modules = external_modules self._external_packages = external_packages
@classmethod def _from_config(cls, *args, config: Config, **kwargs) -> EntrypointLauncher: if "injections" not in kwargs: kwargs["injections"] = config.get_injections() if "ports" not in kwargs: kwargs["ports"] = [ interface["port"] for interface in config.get_interfaces().values() if "port" in interface ] return cls(config, *args, **kwargs)
[docs] def launch(self) -> NoReturn: """Launch a new execution and keeps running forever.. :return: This method does not return anything. """ basic_config( level=self._log_level, log_format=self._log_format, buffered=False, date_format=self._log_date_format ) logger.info("Starting microservice...") exception = None try: self.graceful_launch() logger.info("Microservice is up and running!") self.loop.run_forever() except KeyboardInterrupt as exc: # pragma: no cover logger.info("Stopping microservice...") exception = exc except Exception as exc: # pragma: no cover logger.exception("Stopping microservice due to an unhandled exception...") exception = exc finally: self.graceful_shutdown(exception)
[docs] def graceful_launch(self) -> None: """Launch the execution gracefully. :return: This method does not return anything. """ self.loop.run_until_complete(self.setup())
[docs] def graceful_shutdown(self, err: Exception = None) -> None: """Shutdown the execution gracefully. :return: This method does not return anything. """ self.loop.run_until_complete(self.destroy())
@cached_property def entrypoint(self) -> Entrypoint: """Entrypoint instance. :return: An ``Entrypoint`` instance. """ return _create_entrypoint(*self.ports, loop=self.loop, log_config=False) @cached_property def loop(self) -> AbstractEventLoop: """Create the loop. :return: An ``AbstractEventLoop`` instance. """ return _create_loop() @cached_property def ports(self) -> list[Port]: """List of ports to be launched. :return: A list of ``Port`` instances. """ def _fn(raw: Union[Port, type[Port], str]) -> Port: if isinstance(raw, str): raw = import_module(raw) if isinstance(raw, type): return raw(config=self.config) return raw return [_fn(raw) for raw in self._raw_ports] @property def services(self) -> list[Port]: """List of ports to be launched. :return: A list of ``Port`` instances. """ warnings.warn("'services' property has been deprecated. Use 'ports' instead.", DeprecationWarning) return self.ports async def _setup(self) -> None: """Wire the dependencies and setup it. :return: This method does not return anything. """ modules = self._external_modules + self._internal_modules packages = self._external_packages self.injector.wire_injections(modules=modules, packages=packages) await gather(self.injector.setup_injections(), self.entrypoint.__aenter__()) @property def _internal_modules(self) -> list[ModuleType]: return get_internal_modules() async def _destroy(self) -> None: """Unwire the injected dependencies and destroys it. :return: This method does not return anything. """ await gather(self.entrypoint.__aexit__(None, None, None), self.injector.destroy_injections()) self.injector.unwire_injections() @property def injections(self) -> dict[str, InjectableMixin]: """Get the injections mapping. :return: A ``dict`` with injection names as keys and injection instances as values. """ return self.injector.injections @cached_property def injector(self) -> DependencyInjector: """Dependency injector instance. :return: A ``DependencyInjector`` instance. """ return DependencyInjector(self.config, self._raw_injections)