fix lock and lockdown cogs

This commit is contained in:
Zeva Rose 2022-03-23 01:23:47 -06:00
parent 54dd6f40a8
commit e80afc97ad
2 changed files with 252 additions and 190 deletions

View file

@ -1,113 +1,117 @@
"""J.A.R.V.I.S. LockCog."""
# from dis_snek import Scale
#
# # TODO: Uncomment 99% of code once implementation is figured out
# from contextlib import suppress
# from typing import Union
#
# from dis_snek import InteractionContext, Scale, Snake
# from dis_snek.models.discord.enums import Permissions
# from dis_snek.models.discord.role import Role
# from dis_snek.models.discord.user import User
# from dis_snek.models.discord.channel import GuildText, GuildVoice, PermissionOverwrite
# from dis_snek.models.snek.application_commands import (
# OptionTypes,
# PermissionTypes,
# slash_command,
# slash_option,
# )
# from dis_snek.models.snek.command import check
#
# from jarvis.db.models import Lock
# from jarvis.utils.permissions import admin_or_permissions
#
#
# class LockCog(Scale):
# """J.A.R.V.I.S. LockCog."""
#
# @slash_command(name="lock", description="Lock a channel")
# @slash_option(name="reason",
# description="Lock Reason",
# opt_type=3,
# required=True,)
# @slash_option(name="duration",
# description="Lock duration in minutes (default 10)",
# opt_type=4,
# required=False,)
# @slash_option(name="channel",
# description="Channel to lock",
# opt_type=7,
# required=False,)
# @check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
# async def _lock(
# self,
# ctx: InteractionContext,
# reason: str,
# duration: int = 10,
# channel: Union[GuildText, GuildVoice] = None,
# ) -> None:
# await ctx.defer(ephemeral=True)
# if duration <= 0:
# await ctx.send("Duration must be > 0", ephemeral=True)
# return
#
# elif duration > 60 * 12:
# await ctx.send("Duration must be <= 12 hours", ephemeral=True)
# return
#
# if len(reason) > 100:
# await ctx.send("Reason must be <= 100 characters", ephemeral=True)
# return
# if not channel:
# channel = ctx.channel
#
# # role = ctx.guild.default_role # Uncomment once implemented
# if isinstance(channel, GuildText):
# to_deny = Permissions.SEND_MESSAGES
# elif isinstance(channel, GuildVoice):
# to_deny = Permissions.CONNECT | Permissions.SPEAK
#
# overwrite = PermissionOverwrite(type=PermissionTypes.ROLE, deny=to_deny)
# # TODO: Get original permissions
# # TODO: Apply overwrite
# overwrite = overwrite
# _ = Lock(
# channel=channel.id,
# guild=ctx.guild.id,
# admin=ctx.author.id,
# reason=reason,
# duration=duration,
# ) # .save() # Uncomment once implemented
# # await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
# await ctx.send("Unfortunately, this is not yet implemented", hidden=True)
#
# @cog_ext.cog_slash(
# name="unlock",
# description="Unlocks a channel",
# choices=[
# create_option(
# name="channel",
# description="Channel to lock",
# opt_type=7,
# required=False,
# ),
# ],
# )
# @check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
# async def _unlock(
# self,
# ctx: InteractionContext,
# channel: Union[GuildText, GuildVoice] = None,
# ) -> None:
# if not channel:
# channel = ctx.channel
# lock = Lock.objects(guild=ctx.guild.id, channel=channel.id, active=True).first()
# if not lock:
# await ctx.send(f"{channel.mention} not locked.", ephemeral=True)
# return
# for role in ctx.guild.roles:
# with suppress(Exception):
# await self._unlock_channel(channel, role, ctx.author)
# lock.active = False
# lock.save()
# await ctx.send(f"{channel.mention} unlocked")
from typing import Union
from dis_snek import InteractionContext, Scale
from dis_snek.client.utils.misc_utils import get
from dis_snek.models.discord.channel import GuildText, GuildVoice
from dis_snek.models.discord.enums import Permissions
from dis_snek.models.snek.application_commands import (
OptionTypes,
slash_command,
slash_option,
)
from dis_snek.models.snek.command import check
from jarvis_core.db import q
from jarvis_core.db.models import Lock, Permission
from jarvis.utils.permissions import admin_or_permissions
class LockCog(Scale):
"""J.A.R.V.I.S. LockCog."""
@slash_command(name="lock", description="Lock a channel")
@slash_option(
name="reason",
description="Lock Reason",
opt_type=3,
required=True,
)
@slash_option(
name="duration",
description="Lock duration in minutes (default 10)",
opt_type=4,
required=False,
)
@slash_option(
name="channel",
description="Channel to lock",
opt_type=7,
required=False,
)
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
async def _lock(
self,
ctx: InteractionContext,
reason: str,
duration: int = 10,
channel: Union[GuildText, GuildVoice] = None,
) -> None:
await ctx.defer(ephemeral=True)
if duration <= 0:
await ctx.send("Duration must be > 0", ephemeral=True)
return
elif duration > 60 * 12:
await ctx.send("Duration must be <= 12 hours", ephemeral=True)
return
if len(reason) > 100:
await ctx.send("Reason must be <= 100 characters", ephemeral=True)
return
if not channel:
channel = ctx.channel
# role = ctx.guild.default_role # Uncomment once implemented
if isinstance(channel, GuildText):
to_deny = Permissions.SEND_MESSAGES
elif isinstance(channel, GuildVoice):
to_deny = Permissions.CONNECT | Permissions.SPEAK
current = get(channel.permission_overwrites, id=ctx.guild.id)
if current:
current = Permission(id=ctx.guild.id, allow=int(current.allow), deny=int(current.deny))
role = await ctx.guild.fetch_role(ctx.guild.id)
await channel.add_permission(target=role, deny=to_deny, reason="Locked")
await Lock(
channel=channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
duration=duration,
original_perms=current,
).commit()
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
@slash_command(name="unlock", description="Unlock a channel")
@slash_option(
name="channel",
description="Channel to unlock",
opt_type=OptionTypes.CHANNEL,
required=False,
)
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
async def _unlock(
self,
ctx: InteractionContext,
channel: Union[GuildText, GuildVoice] = None,
) -> None:
if not channel:
channel = ctx.channel
lock = await Lock.find_one(q(guild=ctx.guild.id, channel=channel.id, active=True))
if not lock:
await ctx.send(f"{channel.mention} not locked.", ephemeral=True)
return
overwrite = get(channel.permission_overwrites, id=ctx.guild.id)
if overwrite and lock.original_perms:
overwrite.allow = lock.original_perms.allow
overwrite.deny = lock.original_perms.deny
await channel.edit_permission(overwrite, reason="Unlock")
elif overwrite and not lock.original_perms:
await channel.delete_permission(target=overwrite, reason="Unlock")
lock.active = False
await lock.commit()
await ctx.send(f"{channel.mention} unlocked")

View file

@ -1,101 +1,159 @@
"""J.A.R.V.I.S. LockdownCog."""
from contextlib import suppress
from datetime import datetime
from dis_snek import InteractionContext, Scale, Snake
from dis_snek.client.utils.misc_utils import find_all, get
from dis_snek.models.discord.channel import GuildCategory, GuildChannel
from dis_snek.models.discord.enums import Permissions
from dis_snek.models.discord.guild import Guild
from dis_snek.models.discord.user import Member
from dis_snek.models.snek.application_commands import (
OptionTypes,
slash_command,
slash_option,
)
from dis_snek.models.snek.command import check
from jarvis_core.db import q
from jarvis_core.db.models import Lock, Lockdown, Permission
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.models import Lock
from jarvis.utils.cachecog import CacheCog
# from jarvis.utils.permissions import admin_or_permissions
from jarvis.utils.permissions import admin_or_permissions
class LockdownCog(CacheCog):
async def lock(bot: Snake, target: GuildChannel, admin: Member, reason: str, duration: int) -> None:
"""
Lock an existing channel
Args:
bot: Bot instance
target: Target channel
admin: Admin who initiated lockdown
"""
to_deny = Permissions.SEND_MESSAGES | Permissions.CONNECT | Permissions.SPEAK
current = get(target.permission_overwrites, id=target.guild.id)
if current:
current = Permission(id=target.guild.id, allow=int(current.allow), deny=int(current.deny))
role = await target.guild.fetch_role(target.guild.id)
await target.add_permission(target=role, deny=to_deny, reason="Lockdown")
await Lock(
channel=target.id,
guild=target.guild.id,
admin=admin.id,
reason=reason,
duration=duration,
original_perms=current,
).commit()
async def lock_all(bot: Snake, guild: Guild, admin: Member, reason: str, duration: int) -> None:
"""
Lock all channels
Args:
bot: Bot instance
guild: Target guild
admin: Admin who initiated lockdown
"""
role = await guild.fetch_role(guild.id)
categories = find_all(lambda x: isinstance(x, GuildCategory), guild.channels)
for category in categories:
await lock(bot, category, admin, reason, duration)
perms = category.permissions_for(role)
for channel in category.channels:
if perms != channel.permissions_for(role):
await lock(bot, channel, admin, reason, duration)
async def unlock_all(bot: Snake, guild: Guild, admin: Member) -> None:
"""
Unlock all locked channels
Args:
bot: Bot instance
target: Target channel
admin: Admin who ended lockdown
"""
locks = Lock.find(q(guild=guild.id, active=True))
async for lock in locks:
target = await guild.fetch_channel(lock.channel)
if target:
overwrite = get(target.permission_overwrites, id=guild.id)
if overwrite and lock.original_perms:
overwrite.allow = lock.original_perms.allow
overwrite.deny = lock.original_perms.deny
await target.edit_permission(overwrite, reason="Lockdown end")
elif overwrite and not lock.original_perms:
await target.delete_permission(target=overwrite, reason="Lockdown end")
lock.active = False
await lock.commit()
lockdown = await Lockdown.find_one(q(guild=guild.id, active=True))
if lockdown:
lockdown.active = False
await lockdown.commit()
class LockdownCog(Scale):
"""J.A.R.V.I.S. LockdownCog."""
def __init__(self, bot: commands.Bot):
super().__init__(bot)
@cog_ext.cog_subcommand(
base="lockdown",
name="start",
description="Locks a server",
choices=[
create_option(
name="reason",
description="Lockdown Reason",
opt_type=3,
required=True,
),
create_option(
name="duration",
description="Lockdown duration in minutes (default 10)",
opt_type=4,
required=False,
),
],
@slash_command(
name="lockdown",
description="Manage server-wide lockdown",
sub_cmd_name="start",
sub_cmd_description="Lockdown the server",
)
# @check(admin_or_permissions(manage_channels=True))
@slash_option(
name="reason", description="Lockdown reason", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="duration",
description="Duration in minutes",
opt_type=OptionTypes.INTEGER,
required=False,
)
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
async def _lockdown_start(
self,
ctx: SlashContext,
ctx: InteractionContext,
reason: str,
duration: int = 10,
) -> None:
await ctx.defer(ephemeral=True)
await ctx.defer()
if duration <= 0:
await ctx.send("Duration must be > 0", ephemeral=True)
return
elif duration >= 300:
await ctx.send("Duration must be < 5 hours", ephemeral=True)
return
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
for channel in channels:
for role in roles:
with suppress(Exception):
await self._lock_channel(channel, role, ctx.author, reason)
updates.append(
Lock(
channel=channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
duration=duration,
active=True,
created_at=datetime.utcnow(),
)
)
if updates:
Lock.objects().insert(updates)
await ctx.send(f"Server locked for {duration} minute(s)")
@cog_ext.cog_subcommand(
base="lockdown",
name="end",
description="Unlocks a server",
)
@commands.has_permissions(administrator=True)
exists = await Lockdown.find_one(q(guild=ctx.guild.id, active=True))
if exists:
await ctx.send("Server already in lockdown", ephemeral=True)
return
await lock_all(self.bot, ctx.guild, ctx.author, reason, duration)
role = await ctx.guild.fetch_role(ctx.guild.id)
original_perms = role.permissions
new_perms = role.permissions & ~Permissions.SEND_MESSAGES
await role.edit(permissions=new_perms)
await Lockdown(
admin=ctx.author.id,
duration=duration,
guild=ctx.guild.id,
reason=reason,
original_perms=int(original_perms),
).commit()
await ctx.send("Server now in lockdown.")
@slash_command(name="lockdown", sub_cmd_name="end", sub_cmd_description="End a lockdown")
@check(admin_or_permissions(Permissions.MANAGE_CHANNELS))
async def _lockdown_end(
self,
ctx: SlashContext,
ctx: InteractionContext,
) -> None:
channels = ctx.guild.channels
roles = ctx.guild.roles
update = False
locks = Lock.objects(guild=ctx.guild.id, active=True)
if not locks:
await ctx.send("No lockdown detected.", ephemeral=True)
return
await ctx.defer()
for channel in channels:
for role in roles:
with suppress(Exception):
await self._unlock_channel(channel, role, ctx.author)
update = True
if update:
Lock.objects(guild=ctx.guild.id, active=True).update(set__active=False)
await ctx.send("Server unlocked")
lockdown = await Lockdown.find_one(q(guild=ctx.guild.id, active=True))
if not lockdown:
await ctx.send("Server not in lockdown", ephemeral=True)
return
await unlock_all(self.bot, ctx.guild, ctx.author)
await ctx.send("Server no longer in lockdown.")