168 lines
5.6 KiB
Python
168 lines
5.6 KiB
Python
"""JARVIS LockdownCog."""
|
|
import logging
|
|
|
|
from dis_snek import InteractionContext, Scale, Snake
|
|
from dis_snek.client.utils.misc_utils import find_all, get
|
|
from dis_snek.models.discord.channel import GuildCategory, GuildChannel
|
|
from dis_snek.models.discord.enums import Permissions
|
|
from dis_snek.models.discord.guild import Guild
|
|
from dis_snek.models.discord.user import Member
|
|
from dis_snek.models.snek.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from dis_snek.models.snek.command import check
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Lock, Lockdown, Permission
|
|
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
async def lock(bot: Snake, target: GuildChannel, admin: Member, reason: str, duration: int) -> None:
|
|
"""
|
|
Lock an existing channel
|
|
|
|
Args:
|
|
bot: Bot instance
|
|
target: Target channel
|
|
admin: Admin who initiated lockdown
|
|
"""
|
|
to_deny = Permissions.SEND_MESSAGES | Permissions.CONNECT | Permissions.SPEAK
|
|
current = get(target.permission_overwrites, id=target.guild.id)
|
|
if current:
|
|
current = Permission(id=target.guild.id, allow=int(current.allow), deny=int(current.deny))
|
|
role = await target.guild.fetch_role(target.guild.id)
|
|
await target.add_permission(target=role, deny=to_deny, reason="Lockdown")
|
|
await Lock(
|
|
channel=target.id,
|
|
guild=target.guild.id,
|
|
admin=admin.id,
|
|
reason=reason,
|
|
duration=duration,
|
|
original_perms=current,
|
|
).commit()
|
|
|
|
|
|
async def lock_all(bot: Snake, guild: Guild, admin: Member, reason: str, duration: int) -> None:
|
|
"""
|
|
Lock all channels
|
|
|
|
Args:
|
|
bot: Bot instance
|
|
guild: Target guild
|
|
admin: Admin who initiated lockdown
|
|
"""
|
|
role = await guild.fetch_role(guild.id)
|
|
categories = find_all(lambda x: isinstance(x, GuildCategory), guild.channels)
|
|
for category in categories:
|
|
await lock(bot, category, admin, reason, duration)
|
|
perms = category.permissions_for(role)
|
|
|
|
for channel in category.channels:
|
|
if perms != channel.permissions_for(role):
|
|
await lock(bot, channel, admin, reason, duration)
|
|
|
|
|
|
async def unlock_all(bot: Snake, guild: Guild, admin: Member) -> None:
|
|
"""
|
|
Unlock all locked channels
|
|
|
|
Args:
|
|
bot: Bot instance
|
|
target: Target channel
|
|
admin: Admin who ended lockdown
|
|
"""
|
|
locks = Lock.find(q(guild=guild.id, active=True))
|
|
async for lock in locks:
|
|
target = await guild.fetch_channel(lock.channel)
|
|
if target:
|
|
overwrite = get(target.permission_overwrites, id=guild.id)
|
|
if overwrite and lock.original_perms:
|
|
overwrite.allow = lock.original_perms.allow
|
|
overwrite.deny = lock.original_perms.deny
|
|
await target.edit_permission(overwrite, reason="Lockdown end")
|
|
elif overwrite and not lock.original_perms:
|
|
await target.delete_permission(target=overwrite, reason="Lockdown end")
|
|
lock.active = False
|
|
await lock.commit()
|
|
lockdown = await Lockdown.find_one(q(guild=guild.id, active=True))
|
|
if lockdown:
|
|
lockdown.active = False
|
|
await lockdown.commit()
|
|
|
|
|
|
class LockdownCog(Scale):
|
|
"""JARVIS LockdownCog."""
|
|
|
|
def __init__(self, bot: Snake):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
lockdown = SlashCommand(
|
|
name="lockdown",
|
|
description="Manage server-wide lockdown",
|
|
)
|
|
|
|
@lockdown.subcommand(
|
|
sub_cmd_name="start",
|
|
sub_cmd_description="Lockdown the server",
|
|
)
|
|
@slash_option(
|
|
name="reason", description="Lockdown reason", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@slash_option(
|
|
name="duration",
|
|
description="Duration in minutes",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
|
|
async def _lockdown_start(
|
|
self,
|
|
ctx: InteractionContext,
|
|
reason: str,
|
|
duration: int = 10,
|
|
) -> None:
|
|
await ctx.defer()
|
|
if duration <= 0:
|
|
await ctx.send("Duration must be > 0", ephemeral=True)
|
|
return
|
|
elif duration >= 300:
|
|
await ctx.send("Duration must be < 5 hours", ephemeral=True)
|
|
return
|
|
|
|
exists = await Lockdown.find_one(q(guild=ctx.guild.id, active=True))
|
|
if exists:
|
|
await ctx.send("Server already in lockdown", ephemeral=True)
|
|
return
|
|
|
|
await lock_all(self.bot, ctx.guild, ctx.author, reason, duration)
|
|
role = await ctx.guild.fetch_role(ctx.guild.id)
|
|
original_perms = role.permissions
|
|
new_perms = role.permissions & ~Permissions.SEND_MESSAGES
|
|
await role.edit(permissions=new_perms)
|
|
await Lockdown(
|
|
admin=ctx.author.id,
|
|
duration=duration,
|
|
guild=ctx.guild.id,
|
|
reason=reason,
|
|
original_perms=int(original_perms),
|
|
).commit()
|
|
await ctx.send("Server now in lockdown.")
|
|
|
|
@lockdown.subcommand(sub_cmd_name="end", sub_cmd_description="End a lockdown")
|
|
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
|
|
async def _lockdown_end(
|
|
self,
|
|
ctx: InteractionContext,
|
|
) -> None:
|
|
await ctx.defer()
|
|
|
|
lockdown = await Lockdown.find_one(q(guild=ctx.guild.id, active=True))
|
|
if not lockdown:
|
|
await ctx.send("Server not in lockdown", ephemeral=True)
|
|
return
|
|
|
|
await unlock_all(self.bot, ctx.guild, ctx.author)
|
|
await ctx.send("Server no longer in lockdown.")
|