Source code for minos.networks.http.ports

from __future__ import (
    annotations,
)

import logging
import warnings
from typing import (
    Optional,
)

from cached_property import (
    cached_property,
)

from minos.common import (
    Inject,
    NotProvidedException,
    Port,
)

from .connectors import (
    HttpConnector,
)

logger = logging.getLogger(__name__)


[docs]class HttpPort(Port): """Http Port class."""
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self._init_kwargs = kwargs
async def _start(self) -> None: await self.connector.setup() await self.connector.start() async def _stop(self, err: Exception = None) -> None: await self.connector.stop() await self.connector.destroy() @cached_property def connector(self) -> HttpConnector: """Get the port connector. :return: A ``HttpConnector`` instance. """ return self._get_connector(**self._init_kwargs) @staticmethod @Inject() def _get_connector( connector: Optional[HttpConnector] = None, http_connector: Optional[HttpConnector] = None, **kwargs, ) -> HttpConnector: if connector is None: connector = http_connector if connector is None: raise NotProvidedException(f"A {HttpConnector!r} must be provided.") return connector
[docs]class RestService(HttpPort): """Rest Service class."""
[docs] def __init__(self, **kwargs): warnings.warn(f"{RestService!r} has been deprecated. Use {HttpPort} instead.", DeprecationWarning) super().__init__(**kwargs)