20 lines
566 B
Python
20 lines
566 B
Python
"""JARVIS task utilities."""
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from logging import Logger
|
|
from typing import Coroutine
|
|
|
|
|
|
async def runat(when: datetime, coro: Coroutine, logger: Logger) -> None:
|
|
"""
|
|
Run a task at a scheduled time.
|
|
|
|
Args:
|
|
when: When to run the task
|
|
coro: Coroutine to execute
|
|
logger: Global logger
|
|
"""
|
|
logger.debug(f"Scheduling task {coro.__name__} for {when.isoformat()}")
|
|
delay = when - datetime.now(tz=timezone.utc)
|
|
await asyncio.sleep(delay.total_seconds())
|
|
await coro
|