Source code for minos.common.ports

import logging
from abc import (
    ABC,
    abstractmethod,
)

from aiomisc import (
    Service,
)

logger = logging.getLogger(__name__)


[docs]class Port(Service, ABC): """Port base class."""
[docs] async def start(self) -> None: """Start the port execution. :return: This method does not return anything. """ try: return await self._start() except Exception as exc: logger.exception(f"Raised an exception on {self!r} start: {exc!r}") raise exc
@abstractmethod def _start(self): raise NotImplementedError
[docs] async def stop(self, err: Exception = None) -> None: """Stop the port execution. :param err: Optional exception that stopped the execution. :return: This method does not return anything. """ try: return await self._stop(err) except Exception as exc: logger.exception(f"Raised an exception on {self!r} stop: {exc!r}") raise exc
@abstractmethod async def _stop(self, err: Exception = None) -> None: raise NotImplementedError