Source code for minos.networks.decorators.callables.checkers

from __future__ import (
    annotations,
)

import asyncio
import time
from asyncio import (
    gather,
    iscoroutinefunction,
)
from collections.abc import (
    Awaitable,
    Callable,
    Iterable,
)
from functools import (
    wraps,
)
from inspect import (
    isawaitable,
)
from typing import (
    Optional,
    Protocol,
    Union,
    runtime_checkable,
)

from cached_property import (
    cached_property,
)

from ...exceptions import (
    NotSatisfiedCheckerException,
)
from ...requests import (
    Request,
)

Checker = Callable[[Request], Union[Optional[bool], Awaitable[Optional[bool]]]]


[docs]@runtime_checkable class CheckerWrapper(Protocol): """Checker Wrapper class.""" meta: CheckerMeta __call__: Checker
[docs]class CheckerMeta: """Checker Meta class.""" func: Checker max_attempts: int delay: float
[docs] def __init__(self, func: Checker, max_attempts: int, delay: float): self.func = func self.max_attempts = max_attempts self.delay = delay
[docs] @staticmethod async def run_async(metas: set[CheckerMeta], *args, **kwargs) -> None: """Run a set of checkers asynchronously. :param metas: The set of checkers. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ metas = list(metas) futures = [meta.async_wrapper(*args, **kwargs) for meta in metas] for satisfied, meta in zip(await gather(*futures), metas): if not satisfied: raise NotSatisfiedCheckerException(f"{meta.wrapper!r} is not satisfied.")
[docs] @staticmethod def run_sync(metas: set[CheckerMeta], *args, **kwargs) -> bool: """Run a set of checkers synchronously. :param metas: The set of checkers. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ for meta in metas: satisfied = meta.sync_wrapper(*args, **kwargs) if not satisfied: raise NotSatisfiedCheckerException(f"{meta.wrapper!r} is not satisfied.") return True
@property def wrapper(self) -> CheckerWrapper: """Get the ``HandlerWrapper`` instance. :return: A ``HandlerWrapper`` instance. """ if iscoroutinefunction(self.func): return self.async_wrapper else: return self.sync_wrapper @cached_property def async_wrapper(self) -> CheckerWrapper: """Get the async ``HandlerWrapper`` instance. :return: A ``HandlerWrapper`` instance. """ @wraps(self.func) async def _wrapper(*args, **kwargs) -> bool: r = 0 while r < self.max_attempts: satisfied = self.func(*args, **kwargs) if isawaitable(satisfied): satisfied = await satisfied if satisfied: return True await asyncio.sleep(self.delay) r += 1 return False _wrapper.meta = self return _wrapper @cached_property def sync_wrapper(self) -> CheckerWrapper: """Get the sync ``HandlerWrapper`` instance. :return: A ``HandlerWrapper`` instance. """ if iscoroutinefunction(self.func): raise ValueError(f"{self.func!r} cannot be awaitable.") @wraps(self.func) def _wrapper(*args, **kwargs) -> bool: r = 0 while r < self.max_attempts: satisfied = self.func(*args, **kwargs) if satisfied: return True time.sleep(self.delay) r += 1 return False _wrapper.meta = self return _wrapper def __repr__(self): args = ", ".join(map(repr, self)) return f"{type(self).__name__}({args})" def __eq__(self, other: CheckerMeta) -> bool: return isinstance(other, type(self)) and tuple(self) == tuple(other) def __hash__(self) -> int: return hash(tuple(self)) def __iter__(self) -> Iterable: yield from ( self.func, self.max_attempts, self.delay, )