Source code for minos.common.database.pools

import logging
from asyncio import (
    sleep,
)
from collections.abc import (
    Hashable,
)
from typing import (
    Optional,
)

from aiomisc.pool import (
    ContextManager,
)

from ..config import (
    Config,
)
from ..locks import (
    LockPool,
)
from ..pools import (
    Pool,
)
from .clients import (
    ConnectionException,
    DatabaseClient,
    DatabaseClientBuilder,
)
from .locks import (
    DatabaseLock,
)

logger = logging.getLogger(__name__)


[docs]class DatabaseClientPool(Pool[DatabaseClient]): """Database Client Pool class."""
[docs] def __init__(self, client_builder: DatabaseClientBuilder, *args, **kwargs): super().__init__(*args, **kwargs) self._client_builder = client_builder
@classmethod def _from_config(cls, config: Config, identifier: Optional[str] = None, **kwargs): base_builder = config.get_database_by_name(identifier).get("client") if base_builder is None: raise ValueError(f"{base_builder!r} is not a {DatabaseClientBuilder!r} instance.") elif issubclass(base_builder, DatabaseClient): base_builder = base_builder.get_builder() elif issubclass(base_builder, DatabaseClientBuilder): base_builder = base_builder() client_builder = base_builder.with_name(identifier).with_config(config) return cls(client_builder=client_builder, **kwargs) async def _create_instance(self) -> Optional[DatabaseClient]: instance = self._client_builder.build() try: await instance.setup() except ConnectionException: await sleep(0.1) return None logger.info(f"Created {instance!r}!") return instance async def _destroy_instance(self, instance: DatabaseClient): if instance is None: return logger.info(f"Destroyed {instance!r}!") await instance.destroy() async def _check_instance(self, instance: Optional[DatabaseClient]) -> bool: if instance is None: return False return await instance.is_valid() async def _release_instance(self, instance: DatabaseClient) -> None: await instance.reset() @property def client_cls(self) -> type[DatabaseClient]: """Get the instance's class. :return: A ``type`` instance that is subclass of ``DatabaseClient``. """ return self.client_builder.instance_cls @property def client_builder(self) -> DatabaseClientBuilder: """Get the client builder class. :return: A ``DatabaseClientBuilder`` instance. """ return self._client_builder
[docs]class DatabaseLockPool(LockPool, DatabaseClientPool): """Database Lock Pool class."""
[docs] def acquire(self, key: Hashable, *args, **kwargs) -> DatabaseLock: """Acquire a new lock. :param key: The key to be used for locking. :return: A ``DatabaseLock`` instance. """ acquired = super(DatabaseClientPool, self).acquire() async def _fn_enter(): client = await acquired.__aenter__() return await DatabaseLock(client, key, *args, **kwargs).__aenter__() async def _fn_exit(lock: DatabaseLock): await lock.__aexit__(None, None, None) await acquired.__aexit__(None, None, None) # noinspection PyTypeChecker return ContextManager(_fn_enter, _fn_exit)