62 lines
2.1 KiB
Python
62 lines
2.1 KiB
Python
"""JARVIS Lock background task."""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from dis_snek import Snake
|
|
from dis_snek.client.utils.misc_utils import get
|
|
from dis_snek.models.discord.channel import GuildChannel
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Lock
|
|
|
|
from jarvis_tasks.util import runat
|
|
|
|
queue = []
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _unlock(channel: GuildChannel, lock: Lock) -> None:
|
|
logger.debug(f"Deactivating lock {lock.id}")
|
|
try:
|
|
overwrite = get(channel.permission_overwrites, id=lock.guild)
|
|
if overwrite and lock.original_perms:
|
|
overwrite.allow = lock.original_perms.allow
|
|
overwrite.deny = lock.original_perms.deny
|
|
try:
|
|
await channel.edit_permission(overwrite, reason="Automatic unlock")
|
|
except Exception:
|
|
logger.debug("Locked channel deleted, ignoring error")
|
|
elif overwrite and not lock.original_perms:
|
|
await channel.delete_permission(target=overwrite, reason="Automatic unlock")
|
|
else:
|
|
logger.debug("Permission neither exists not existed")
|
|
except Exception:
|
|
logger.debug("Locked channel deleted, ignoring error")
|
|
lock.active = False
|
|
await lock.commit()
|
|
queue.remove(lock.id)
|
|
|
|
|
|
async def unlock(bot: Snake) -> None:
|
|
"""
|
|
Unlock locked channels.
|
|
|
|
Args:
|
|
bot: Bot instance
|
|
logger: Global logger
|
|
"""
|
|
logger.debug("Starting Task-lock")
|
|
while True:
|
|
max_ts = datetime.now(tz=timezone.utc) + timedelta(seconds=55)
|
|
locks = Lock.find(q(active=True, created_at__lte=max_ts))
|
|
async for lock in locks:
|
|
if lock.id in queue:
|
|
continue
|
|
guild = await bot.fetch_guild(lock.guild)
|
|
channel = await guild.fetch_channel(lock.channel)
|
|
coro = _unlock(channel, lock)
|
|
when = lock.created_at + timedelta(minutes=lock.duration)
|
|
asyncio.create_task(runat(when, coro, logger))
|
|
queue.append(lock.id)
|
|
|
|
await asyncio.sleep(delay=60)
|