Source code for minos.networks.scheduling.ports

import logging
import warnings

from cached_property import (
    cached_property,
)

from minos.common import (
    Port,
)

from .schedulers import (
    PeriodicTaskScheduler,
)

logger = logging.getLogger(__name__)


[docs]class PeriodicPort(Port): """Periodic Port class."""
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self._init_kwargs = kwargs
async def _start(self) -> None: await self.scheduler.setup() try: self.start_event.set() except RuntimeError: logger.warning("Runtime is not properly setup.") await self.scheduler.start() async def _stop(self, exception: Exception = None) -> None: await self.scheduler.stop() await self.scheduler.destroy() @cached_property def scheduler(self) -> PeriodicTaskScheduler: """Get the service scheduler. :return: A ``PeriodicTaskScheduler`` instance. """ return PeriodicTaskScheduler.from_config(**self._init_kwargs)
[docs]class PeriodicTaskSchedulerService(PeriodicPort): """Periodic Task Scheduler Service class."""
[docs] def __init__(self, **kwargs): warnings.warn( f"{PeriodicTaskSchedulerService!r} has been deprecated. Use {PeriodicPort} instead.", DeprecationWarning, ) super().__init__(**kwargs)