Source code for minos.plugins.aiohttp.connectors

import logging
from collections.abc import (
    Callable,
)
from typing import (
    Optional,
)

from aiohttp import (
    web,
    web_runner,
)
from cached_property import (
    cached_property,
)

from minos.networks import (
    HttpConnector,
    Request,
    Response,
)

from .requests import (
    AioHttpRequest,
    AioHttpResponse,
)

logger = logging.getLogger(__name__)


[docs]class AioHttpConnector(HttpConnector[web.Request, web.Response]): """AioHttp Connector class."""
[docs] def __init__(self, *args, shutdown_timeout: float = 6, **kwargs): super().__init__(*args, **kwargs) self._shutdown_timeout = shutdown_timeout self._runner = None self._site = None
@property def shutdown_timeout(self) -> float: """Get the shutdown timeout. :return: A ``float`` value. """ return self._shutdown_timeout @property def runner(self) -> Optional[web_runner.AppRunner]: """Get the application runner. :return: An ``AppRunner`` instance or ``None``. """ return self._runner @property def site(self) -> Optional[web_runner.TCPSite]: """Get the application site. :return: A ``SockSite`` instance or ``None``. """ return self._site @cached_property def application(self) -> web.Application: """Get the application. :return: An ``Application`` instance. """ return web.Application() def _mount_route(self, path: str, method: str, adapted_callback: Callable) -> None: self.application.router.add_route(method, path, adapted_callback) async def _build_request(self, request: web.Request) -> Request: return AioHttpRequest(request) async def _build_response(self, response: Optional[Response]) -> web.Response: if not isinstance(response, AioHttpResponse): response = AioHttpResponse.from_response(response) content = await response.content() content_type = response.content_type status = response.status return web.Response(body=content, content_type=content_type, status=status) async def _build_error_response(self, message: str, status: int) -> web.Response: return web.Response(text=message, status=status) async def _start(self) -> None: self._runner = web_runner.AppRunner(self.application, access_log=None) await self._runner.setup() self._site = web_runner.TCPSite( runner=self._runner, host=self._host, port=self._port, reuse_address=True, reuse_port=False, shutdown_timeout=self._shutdown_timeout, ) await self._site.start() async def _stop(self, exception: Exception = None) -> None: if self._site is not None: await self._site.stop() self._site = None if self._runner is not None: await self._runner.cleanup() self._runner = None