diff --git a/jarvis/cogs/admin/lock.py b/jarvis/cogs/admin/lock.py index e17cf58..eefdada 100644 --- a/jarvis/cogs/admin/lock.py +++ b/jarvis/cogs/admin/lock.py @@ -1,113 +1,117 @@ """J.A.R.V.I.S. LockCog.""" -# from dis_snek import Scale -# -# # TODO: Uncomment 99% of code once implementation is figured out -# from contextlib import suppress -# from typing import Union -# -# from dis_snek import InteractionContext, Scale, Snake -# from dis_snek.models.discord.enums import Permissions -# from dis_snek.models.discord.role import Role -# from dis_snek.models.discord.user import User -# from dis_snek.models.discord.channel import GuildText, GuildVoice, PermissionOverwrite -# from dis_snek.models.snek.application_commands import ( -# OptionTypes, -# PermissionTypes, -# slash_command, -# slash_option, -# ) -# from dis_snek.models.snek.command import check -# -# from jarvis.db.models import Lock -# from jarvis.utils.permissions import admin_or_permissions -# -# -# class LockCog(Scale): -# """J.A.R.V.I.S. LockCog.""" -# -# @slash_command(name="lock", description="Lock a channel") -# @slash_option(name="reason", -# description="Lock Reason", -# opt_type=3, -# required=True,) -# @slash_option(name="duration", -# description="Lock duration in minutes (default 10)", -# opt_type=4, -# required=False,) -# @slash_option(name="channel", -# description="Channel to lock", -# opt_type=7, -# required=False,) -# @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) -# async def _lock( -# self, -# ctx: InteractionContext, -# reason: str, -# duration: int = 10, -# channel: Union[GuildText, GuildVoice] = None, -# ) -> None: -# await ctx.defer(ephemeral=True) -# if duration <= 0: -# await ctx.send("Duration must be > 0", ephemeral=True) -# return -# -# elif duration > 60 * 12: -# await ctx.send("Duration must be <= 12 hours", ephemeral=True) -# return -# -# if len(reason) > 100: -# await ctx.send("Reason must be <= 100 characters", ephemeral=True) -# return -# if not channel: -# channel = ctx.channel -# -# # role = ctx.guild.default_role # Uncomment once implemented -# if isinstance(channel, GuildText): -# to_deny = Permissions.SEND_MESSAGES -# elif isinstance(channel, GuildVoice): -# to_deny = Permissions.CONNECT | Permissions.SPEAK -# -# overwrite = PermissionOverwrite(type=PermissionTypes.ROLE, deny=to_deny) -# # TODO: Get original permissions -# # TODO: Apply overwrite -# overwrite = overwrite -# _ = Lock( -# channel=channel.id, -# guild=ctx.guild.id, -# admin=ctx.author.id, -# reason=reason, -# duration=duration, -# ) # .save() # Uncomment once implemented -# # await ctx.send(f"{channel.mention} locked for {duration} minute(s)") -# await ctx.send("Unfortunately, this is not yet implemented", hidden=True) -# -# @cog_ext.cog_slash( -# name="unlock", -# description="Unlocks a channel", -# choices=[ -# create_option( -# name="channel", -# description="Channel to lock", -# opt_type=7, -# required=False, -# ), -# ], -# ) -# @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) -# async def _unlock( -# self, -# ctx: InteractionContext, -# channel: Union[GuildText, GuildVoice] = None, -# ) -> None: -# if not channel: -# channel = ctx.channel -# lock = Lock.objects(guild=ctx.guild.id, channel=channel.id, active=True).first() -# if not lock: -# await ctx.send(f"{channel.mention} not locked.", ephemeral=True) -# return -# for role in ctx.guild.roles: -# with suppress(Exception): -# await self._unlock_channel(channel, role, ctx.author) -# lock.active = False -# lock.save() -# await ctx.send(f"{channel.mention} unlocked") +from typing import Union + +from dis_snek import InteractionContext, Scale +from dis_snek.client.utils.misc_utils import get +from dis_snek.models.discord.channel import GuildText, GuildVoice +from dis_snek.models.discord.enums import Permissions +from dis_snek.models.snek.application_commands import ( + OptionTypes, + slash_command, + slash_option, +) +from dis_snek.models.snek.command import check +from jarvis_core.db import q +from jarvis_core.db.models import Lock, Permission + +from jarvis.utils.permissions import admin_or_permissions + + +class LockCog(Scale): + """J.A.R.V.I.S. LockCog.""" + + @slash_command(name="lock", description="Lock a channel") + @slash_option( + name="reason", + description="Lock Reason", + opt_type=3, + required=True, + ) + @slash_option( + name="duration", + description="Lock duration in minutes (default 10)", + opt_type=4, + required=False, + ) + @slash_option( + name="channel", + description="Channel to lock", + opt_type=7, + required=False, + ) + @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) + async def _lock( + self, + ctx: InteractionContext, + reason: str, + duration: int = 10, + channel: Union[GuildText, GuildVoice] = None, + ) -> None: + await ctx.defer(ephemeral=True) + if duration <= 0: + await ctx.send("Duration must be > 0", ephemeral=True) + return + + elif duration > 60 * 12: + await ctx.send("Duration must be <= 12 hours", ephemeral=True) + return + + if len(reason) > 100: + await ctx.send("Reason must be <= 100 characters", ephemeral=True) + return + if not channel: + channel = ctx.channel + + # role = ctx.guild.default_role # Uncomment once implemented + if isinstance(channel, GuildText): + to_deny = Permissions.SEND_MESSAGES + elif isinstance(channel, GuildVoice): + to_deny = Permissions.CONNECT | Permissions.SPEAK + + current = get(channel.permission_overwrites, id=ctx.guild.id) + if current: + current = Permission(id=ctx.guild.id, allow=int(current.allow), deny=int(current.deny)) + role = await ctx.guild.fetch_role(ctx.guild.id) + + await channel.add_permission(target=role, deny=to_deny, reason="Locked") + await Lock( + channel=channel.id, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + duration=duration, + original_perms=current, + ).commit() + await ctx.send(f"{channel.mention} locked for {duration} minute(s)") + + @slash_command(name="unlock", description="Unlock a channel") + @slash_option( + name="channel", + description="Channel to unlock", + opt_type=OptionTypes.CHANNEL, + required=False, + ) + @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) + async def _unlock( + self, + ctx: InteractionContext, + channel: Union[GuildText, GuildVoice] = None, + ) -> None: + if not channel: + channel = ctx.channel + lock = await Lock.find_one(q(guild=ctx.guild.id, channel=channel.id, active=True)) + if not lock: + await ctx.send(f"{channel.mention} not locked.", ephemeral=True) + return + + overwrite = get(channel.permission_overwrites, id=ctx.guild.id) + if overwrite and lock.original_perms: + overwrite.allow = lock.original_perms.allow + overwrite.deny = lock.original_perms.deny + await channel.edit_permission(overwrite, reason="Unlock") + elif overwrite and not lock.original_perms: + await channel.delete_permission(target=overwrite, reason="Unlock") + + lock.active = False + await lock.commit() + await ctx.send(f"{channel.mention} unlocked") diff --git a/jarvis/cogs/admin/lockdown.py b/jarvis/cogs/admin/lockdown.py index cd711fb..14407d8 100644 --- a/jarvis/cogs/admin/lockdown.py +++ b/jarvis/cogs/admin/lockdown.py @@ -1,101 +1,159 @@ """J.A.R.V.I.S. LockdownCog.""" -from contextlib import suppress -from datetime import datetime +from dis_snek import InteractionContext, Scale, Snake +from dis_snek.client.utils.misc_utils import find_all, get +from dis_snek.models.discord.channel import GuildCategory, GuildChannel +from dis_snek.models.discord.enums import Permissions +from dis_snek.models.discord.guild import Guild +from dis_snek.models.discord.user import Member +from dis_snek.models.snek.application_commands import ( + OptionTypes, + slash_command, + slash_option, +) +from dis_snek.models.snek.command import check +from jarvis_core.db import q +from jarvis_core.db.models import Lock, Lockdown, Permission -from discord.ext import commands -from discord_slash import SlashContext, cog_ext -from discord_slash.utils.manage_commands import create_option - -from jarvis.db.models import Lock -from jarvis.utils.cachecog import CacheCog - -# from jarvis.utils.permissions import admin_or_permissions +from jarvis.utils.permissions import admin_or_permissions -class LockdownCog(CacheCog): +async def lock(bot: Snake, target: GuildChannel, admin: Member, reason: str, duration: int) -> None: + """ + Lock an existing channel + + Args: + bot: Bot instance + target: Target channel + admin: Admin who initiated lockdown + """ + to_deny = Permissions.SEND_MESSAGES | Permissions.CONNECT | Permissions.SPEAK + current = get(target.permission_overwrites, id=target.guild.id) + if current: + current = Permission(id=target.guild.id, allow=int(current.allow), deny=int(current.deny)) + role = await target.guild.fetch_role(target.guild.id) + await target.add_permission(target=role, deny=to_deny, reason="Lockdown") + await Lock( + channel=target.id, + guild=target.guild.id, + admin=admin.id, + reason=reason, + duration=duration, + original_perms=current, + ).commit() + + +async def lock_all(bot: Snake, guild: Guild, admin: Member, reason: str, duration: int) -> None: + """ + Lock all channels + + Args: + bot: Bot instance + guild: Target guild + admin: Admin who initiated lockdown + """ + role = await guild.fetch_role(guild.id) + categories = find_all(lambda x: isinstance(x, GuildCategory), guild.channels) + for category in categories: + await lock(bot, category, admin, reason, duration) + perms = category.permissions_for(role) + + for channel in category.channels: + if perms != channel.permissions_for(role): + await lock(bot, channel, admin, reason, duration) + + +async def unlock_all(bot: Snake, guild: Guild, admin: Member) -> None: + """ + Unlock all locked channels + + Args: + bot: Bot instance + target: Target channel + admin: Admin who ended lockdown + """ + locks = Lock.find(q(guild=guild.id, active=True)) + async for lock in locks: + target = await guild.fetch_channel(lock.channel) + if target: + overwrite = get(target.permission_overwrites, id=guild.id) + if overwrite and lock.original_perms: + overwrite.allow = lock.original_perms.allow + overwrite.deny = lock.original_perms.deny + await target.edit_permission(overwrite, reason="Lockdown end") + elif overwrite and not lock.original_perms: + await target.delete_permission(target=overwrite, reason="Lockdown end") + lock.active = False + await lock.commit() + lockdown = await Lockdown.find_one(q(guild=guild.id, active=True)) + if lockdown: + lockdown.active = False + await lockdown.commit() + + +class LockdownCog(Scale): """J.A.R.V.I.S. LockdownCog.""" - def __init__(self, bot: commands.Bot): - super().__init__(bot) - - @cog_ext.cog_subcommand( - base="lockdown", - name="start", - description="Locks a server", - choices=[ - create_option( - name="reason", - description="Lockdown Reason", - opt_type=3, - required=True, - ), - create_option( - name="duration", - description="Lockdown duration in minutes (default 10)", - opt_type=4, - required=False, - ), - ], + @slash_command( + name="lockdown", + description="Manage server-wide lockdown", + sub_cmd_name="start", + sub_cmd_description="Lockdown the server", ) - # @check(admin_or_permissions(manage_channels=True)) + @slash_option( + name="reason", description="Lockdown reason", opt_type=OptionTypes.STRING, required=True + ) + @slash_option( + name="duration", + description="Duration in minutes", + opt_type=OptionTypes.INTEGER, + required=False, + ) + @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) async def _lockdown_start( self, - ctx: SlashContext, + ctx: InteractionContext, reason: str, duration: int = 10, ) -> None: - await ctx.defer(ephemeral=True) + await ctx.defer() if duration <= 0: await ctx.send("Duration must be > 0", ephemeral=True) return elif duration >= 300: await ctx.send("Duration must be < 5 hours", ephemeral=True) return - channels = ctx.guild.channels - roles = ctx.guild.roles - updates = [] - for channel in channels: - for role in roles: - with suppress(Exception): - await self._lock_channel(channel, role, ctx.author, reason) - updates.append( - Lock( - channel=channel.id, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - duration=duration, - active=True, - created_at=datetime.utcnow(), - ) - ) - if updates: - Lock.objects().insert(updates) - await ctx.send(f"Server locked for {duration} minute(s)") - @cog_ext.cog_subcommand( - base="lockdown", - name="end", - description="Unlocks a server", - ) - @commands.has_permissions(administrator=True) + exists = await Lockdown.find_one(q(guild=ctx.guild.id, active=True)) + if exists: + await ctx.send("Server already in lockdown", ephemeral=True) + return + + await lock_all(self.bot, ctx.guild, ctx.author, reason, duration) + role = await ctx.guild.fetch_role(ctx.guild.id) + original_perms = role.permissions + new_perms = role.permissions & ~Permissions.SEND_MESSAGES + await role.edit(permissions=new_perms) + await Lockdown( + admin=ctx.author.id, + duration=duration, + guild=ctx.guild.id, + reason=reason, + original_perms=int(original_perms), + ).commit() + await ctx.send("Server now in lockdown.") + + @slash_command(name="lockdown", sub_cmd_name="end", sub_cmd_description="End a lockdown") + @check(admin_or_permissions(Permissions.MANAGE_CHANNELS)) async def _lockdown_end( self, - ctx: SlashContext, + ctx: InteractionContext, ) -> None: - channels = ctx.guild.channels - roles = ctx.guild.roles - update = False - locks = Lock.objects(guild=ctx.guild.id, active=True) - if not locks: - await ctx.send("No lockdown detected.", ephemeral=True) - return await ctx.defer() - for channel in channels: - for role in roles: - with suppress(Exception): - await self._unlock_channel(channel, role, ctx.author) - update = True - if update: - Lock.objects(guild=ctx.guild.id, active=True).update(set__active=False) - await ctx.send("Server unlocked") + + lockdown = await Lockdown.find_one(q(guild=ctx.guild.id, active=True)) + if not lockdown: + await ctx.send("Server not in lockdown", ephemeral=True) + return + + await unlock_all(self.bot, ctx.guild, ctx.author) + await ctx.send("Server no longer in lockdown.")