from __future__ import (
annotations,
)
import asyncio
import logging
import traceback
from contextlib import (
suppress,
)
from datetime import (
datetime,
)
from inspect import (
isawaitable,
)
from typing import (
Awaitable,
Callable,
NoReturn,
Optional,
Union,
)
from crontab import CronTab as CronTabImpl
from minos.common import (
Config,
SetupMixin,
current_datetime,
)
from ..decorators import (
EnrouteFactory,
)
from ..requests import (
ResponseException,
)
from .crontab import (
CronTab,
)
from .requests import (
ScheduledRequest,
)
logger = logging.getLogger(__name__)
[docs]class PeriodicTaskScheduler(SetupMixin):
"""Periodic Task Scheduler class."""
[docs] def __init__(self, tasks: set[PeriodicTask], *args, **kwargs):
super().__init__(*args, **kwargs)
self._tasks = tasks
@classmethod
def _from_config(cls, config: Config, **kwargs) -> PeriodicTaskScheduler:
tasks = cls._tasks_from_config(config, **kwargs)
return cls(tasks, **kwargs)
@staticmethod
def _tasks_from_config(config: Config, **kwargs) -> set[PeriodicTask]:
builder = EnrouteFactory(*config.get_services(), middleware=config.get_middleware())
decorators = builder.get_periodic_event(config=config, **kwargs)
tasks = {PeriodicTask(decorator.crontab, fn) for decorator, fn in decorators.items()}
return tasks
@property
def tasks(self) -> set[PeriodicTask]:
"""Get the set of periodic tasks.
:return: A ``set`` of ``PeriodicTask`` instances.
"""
return self._tasks
[docs] async def start(self) -> None:
"""Start the execution of periodic tasks.
:return: This method does not return anything.
"""
await asyncio.gather(*(task.start() for task in self._tasks))
[docs] async def stop(self, timeout: Optional[float] = None) -> None:
"""Stop the execution of periodic tasks.
:param timeout: An optional timeout expressed in seconds.
:return: This method does not return anything.
"""
await asyncio.gather(*(task.stop(timeout=timeout) for task in self._tasks))
[docs]class PeriodicTask:
"""Periodic Task class."""
_task: Optional[asyncio.Task]
[docs] def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]):
if not isinstance(crontab, CronTab):
crontab = CronTab(crontab)
self._crontab = crontab
self._fn = fn
self._task = None
self._running = False
@property
def crontab(self) -> CronTab:
"""Get the crontab of the periodic task.
:return: A ``CronTab`` instance.
"""
return self._crontab
@property
def fn(self) -> Callable[[ScheduledRequest], Awaitable[None]]:
"""Get the function to be called periodically.
:return: A function returning an awaitable.
"""
return self._fn
@property
def started(self) -> bool:
"""Check if the periodic task has been started.
:return: ``True`` if started or ``False`` otherwise.
"""
return self._task is not None
@property
def task(self) -> asyncio.Task:
"""Get the asyncio task.
:return: An ``asyncio.Task`` instance.
"""
return self._task
[docs] async def start(self) -> None:
"""Start the periodic task.
:return: This method does not return anything.
"""
logger.info("Starting periodic task...")
self._task = asyncio.create_task(self.run_forever())
[docs] async def stop(self, timeout: Optional[float] = None) -> None:
"""Stop the periodic task.
:param timeout: An optional timeout expressed in seconds.
:return: This method does not return anything.
"""
if self._task is not None:
logger.info("Stopping periodic task...")
self._task.cancel()
with suppress(asyncio.TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(self._task, timeout)
self._task = None
[docs] async def run_forever(self) -> NoReturn:
"""Run the periodic function forever. This method is equivalent to start, but it keeps waiting until infinite.
:return: This method never returns.
"""
async for now in self._crontab:
await self.run_once(now)
@property
def running(self) -> bool:
"""Check if the periodic function is running.
:return: ``True`` if it's running or ``False`` otherwise.
"""
return self._running
[docs] async def run_once(self, now: Optional[datetime] = None) -> None:
"""Run the periodic function one time.
:param now: An optional datetime expressing the current datetime.
:return: This method does not return anything.
"""
if now is None:
now = current_datetime()
request = ScheduledRequest(now)
logger.debug("Running periodic task...")
# noinspection PyBroadException
try:
self._running = True
with suppress(asyncio.CancelledError):
response = self._fn(request)
if isawaitable(response):
await response
except ResponseException:
tb = traceback.format_exc()
logger.error(f"Raised an application exception:\n {tb}")
except Exception:
tb = traceback.format_exc()
logger.exception(f"Raised a system exception:\n {tb}")
finally:
self._running = False