Source code for minos.plugins.aiopg.clients

from __future__ import (
    annotations,
)

import logging
from asyncio import (
    TimeoutError,
)
from collections.abc import (
    AsyncIterator,
    Iterable,
)
from functools import (
    partial,
)
from typing import (
    Optional,
)

import aiopg
from aiopg import (
    Connection,
    Cursor,
)
from psycopg2 import (
    IntegrityError,
    OperationalError,
    ProgrammingError,
)

from minos.common import (
    CircuitBreakerMixin,
    ConnectionException,
    DatabaseClient,
    IntegrityException,
    ProgrammingException,
)

from .operations import (
    AiopgDatabaseOperation,
)

logger = logging.getLogger(__name__)


[docs]class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin): """Aiopg Database Client class.""" _connection: Optional[Connection] _cursor: Optional[Cursor]
[docs] def __init__( self, database: str, host: Optional[str] = None, port: Optional[int] = None, user: Optional[str] = None, password: Optional[str] = None, circuit_breaker_exceptions: Iterable[type] = tuple(), connection_timeout: Optional[float] = None, cursor_timeout: Optional[float] = None, *args, **kwargs, ): super().__init__( *args, **kwargs, circuit_breaker_exceptions=(ConnectionException, *circuit_breaker_exceptions), ) if host is None: host = "localhost" if port is None: port = 5432 if user is None: user = "postgres" if password is None: password = "" if connection_timeout is None: connection_timeout = 1 if cursor_timeout is None: cursor_timeout = 60 self._database = database self._host = host self._port = port self._user = user self._password = password self._connection_timeout = connection_timeout self._cursor_timeout = cursor_timeout self._connection = None self._cursor = None
async def _setup(self) -> None: await super()._setup() await self.recreate() async def _destroy(self) -> None: await super()._destroy() await self.close()
[docs] async def recreate(self) -> None: """Recreate the database connection. :return: This method does not return anything. """ await self.close() self._connection = await self.with_circuit_breaker(self._connect) logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
async def _connect(self) -> Connection: try: return await aiopg.connect( timeout=self._connection_timeout, host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password, ) except (OperationalError, TimeoutError) as exc: raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")
[docs] async def close(self) -> None: """Close database connection. :return: This method does not return anything. """ if await self.is_connected(): await self._connection.close() if self._connection is not None: logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") self._connection = None
[docs] async def is_connected(self) -> bool: """Check if the client is connected. :return: ``True`` if it is connected or ``False`` otherwise. """ if self._connection is None: return False try: # This operation connects to the database and raises an exception if something goes wrong. self._connection.isolation_level except OperationalError: return False return not self._connection.closed
async def _reset(self, **kwargs) -> None: await self._destroy_cursor(**kwargs) # noinspection PyUnusedLocal async def _fetch_all(self) -> AsyncIterator[tuple]: if self._cursor is None: raise ProgrammingException("An operation must be executed before fetching any value.") try: async for row in self._cursor: yield row except ProgrammingError as exc: raise ProgrammingException(str(exc)) except OperationalError as exc: raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") # noinspection PyUnusedLocal async def _execute(self, operation: AiopgDatabaseOperation) -> None: if not isinstance(operation, AiopgDatabaseOperation): raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}") fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters) await self.with_circuit_breaker(fn) async def _execute_cursor(self, operation: str, parameters: dict): if not await self.is_connected(): await self.recreate() self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) try: await self._cursor.execute(operation=operation, parameters=parameters) except OperationalError as exc: raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") except IntegrityError as exc: raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") async def _destroy_cursor(self, **kwargs): if self._cursor is not None: if not self._cursor.closed: self._cursor.close() self._cursor = None @property def cursor(self) -> Optional[Cursor]: """Get the cursor. :return: A ``Cursor`` instance. """ return self._cursor @property def connection(self) -> Optional[Connection]: """Get the connection. :return: A ``Connection`` instance. """ return self._connection @property def database(self) -> str: """Get the database's database. :return: A ``str`` value. """ return self._database @property def host(self) -> str: """Get the database's host. :return: A ``str`` value. """ return self._host @property def port(self) -> int: """Get the database's port. :return: An ``int`` value. """ return self._port @property def user(self) -> str: """Get the database's user. :return: A ``str`` value. """ return self._user @property def password(self) -> str: """Get the database's password. :return: A ``str`` value. """ return self._password