Fix admin-related commands. TODO: Settings, Lock, Lockdown, Mute

This commit is contained in:
Zeva Rose 2022-03-18 19:29:04 -06:00
parent 3921c41f15
commit a9d736c7a8
4 changed files with 87 additions and 107 deletions

View file

@ -302,14 +302,6 @@ class BanCog(Scale):
@check(admin_or_permissions(Permissions.BAN_MEMBERS)) @check(admin_or_permissions(Permissions.BAN_MEMBERS))
async def _bans_list(self, ctx: InteractionContext, btype: int = 0, active: int = 1) -> None: async def _bans_list(self, ctx: InteractionContext, btype: int = 0, active: int = 1) -> None:
active = bool(active) active = bool(active)
exists = self.check_cache(ctx, btype=btype, active=active)
if exists:
await ctx.defer(ephemeral=True)
await ctx.send(
f"Please use existing interaction: {exists['paginator']._message.jump_url}",
ephemeral=True,
)
return
types = [0, "perm", "temp", "soft"] types = [0, "perm", "temp", "soft"]
search = {"guild": ctx.guild.id} search = {"guild": ctx.guild.id}
if active: if active:

View file

@ -37,13 +37,12 @@ class PurgeCog(Scale):
async for message in ctx.channel.history(limit=amount + 1): async for message in ctx.channel.history(limit=amount + 1):
messages.append(message) messages.append(message)
await ctx.channel.delete_messages(messages, reason=f"Purge by {ctx.author.username}") await ctx.channel.delete_messages(messages, reason=f"Purge by {ctx.author.username}")
p = Purge( await Purge(
channel=ctx.channel.id, channel=ctx.channel.id,
guild=ctx.guild.id, guild=ctx.guild.id,
admin=ctx.author.id, admin=ctx.author.id,
count=amount, count=amount,
) ).commit()
await p.commit()
@slash_command( @slash_command(
name="autopurge", sub_cmd_name="add", sub_cmd_description="Automatically purge messages" name="autopurge", sub_cmd_name="add", sub_cmd_description="Automatically purge messages"
@ -79,13 +78,12 @@ class PurgeCog(Scale):
await ctx.send("Autopurge already exists.", ephemeral=True) await ctx.send("Autopurge already exists.", ephemeral=True)
return return
p = Autopurge( await Autopurge(
guild=ctx.guild.id, guild=ctx.guild.id,
channel=channel.id, channel=channel.id,
admin=ctx.author.id, admin=ctx.author.id,
delay=delay, delay=delay,
) ).commit()
await p.commit()
await ctx.send(f"Autopurge set up on {channel.mention}, delay is {delay} seconds") await ctx.send(f"Autopurge set up on {channel.mention}, delay is {delay} seconds")

View file

@ -32,13 +32,13 @@ class RolepingCog(Scale):
await ctx.send(f"Role `{role.name}` already in roleping.", ephemeral=True) await ctx.send(f"Role `{role.name}` already in roleping.", ephemeral=True)
return return
_ = Roleping( _ = await Roleping(
role=role.id, role=role.id,
guild=ctx.guild.id, guild=ctx.guild.id,
admin=ctx.author.id, admin=ctx.author.id,
active=True, active=True,
bypass={"roles": [], "users": []}, bypass={"roles": [], "users": []},
).save() ).commit()
await ctx.send(f"Role `{role.name}` added to roleping.") await ctx.send(f"Role `{role.name}` added to roleping.")
@slash_command(name="roleping", sub_cmd_name="remove", sub_cmd_description="Remove a role") @slash_command(name="roleping", sub_cmd_name="remove", sub_cmd_description="Remove a role")
@ -47,29 +47,29 @@ class RolepingCog(Scale):
) )
@check(admin_or_permissions(Permissions.MANAGE_GUILD)) @check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _roleping_remove(self, ctx: InteractionContext, role: Role) -> None: async def _roleping_remove(self, ctx: InteractionContext, role: Role) -> None:
roleping = Roleping.objects(guild=ctx.guild.id, role=role.id) roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=role.id))
if not roleping: if not roleping:
await ctx.send("Roleping does not exist", ephemeral=True) await ctx.send("Roleping does not exist", ephemeral=True)
return return
roleping.delete() await roleping.delete()
await ctx.send(f"Role `{role.name}` removed from roleping.") await ctx.send(f"Role `{role.name}` removed from roleping.")
@slash_command(name="roleping", sub_cmd_name="list", description="Lick all blocklisted roles") @slash_command(name="roleping", sub_cmd_name="list", description="Lick all blocklisted roles")
async def _roleping_list(self, ctx: InteractionContext) -> None: async def _roleping_list(self, ctx: InteractionContext) -> None:
rolepings = Roleping.objects(guild=ctx.guild.id) rolepings = await Roleping.find(q(guild=ctx.guild.id)).to_list(None)
if not rolepings: if not rolepings:
await ctx.send("No rolepings configured", ephemeral=True) await ctx.send("No rolepings configured", ephemeral=True)
return return
embeds = [] embeds = []
for roleping in rolepings: for roleping in rolepings:
role = await ctx.guild.get_role(roleping.role) role = await ctx.guild.fetch_role(roleping.role)
broles = find_all(lambda x: x.id in roleping.bypass["roles"], ctx.guild.roles) broles = find_all(lambda x: x.id in roleping.bypass["roles"], ctx.guild.roles)
bypass_roles = [r.mention or "||`[redacted]`||" for r in broles] bypass_roles = [r.mention or "||`[redacted]`||" for r in broles]
bypass_users = [ bypass_users = [
await ctx.guild.get_member(u).mention or "||`[redacted]`||" (await ctx.guild.fetch_member(u)).mention or "||`[redacted]`||"
for u in roleping.bypass["users"] for u in roleping.bypass["users"]
] ]
bypass_roles = bypass_roles or ["None"] bypass_roles = bypass_roles or ["None"]
@ -84,24 +84,26 @@ class RolepingCog(Scale):
value=roleping.created_at.strftime("%a, %b %d, %Y %I:%M %p"), value=roleping.created_at.strftime("%a, %b %d, %Y %I:%M %p"),
inline=False, inline=False,
), ),
EmbedField(name="Active", value=str(roleping.active)), # EmbedField(name="Active", value=str(roleping.active), inline=True),
EmbedField( EmbedField(
name="Bypass Users", name="Bypass Users",
value="\n".join(bypass_users), value="\n".join(bypass_users),
inline=True,
), ),
EmbedField( EmbedField(
name="Bypass Roles", name="Bypass Roles",
value="\n".join(bypass_roles), value="\n".join(bypass_roles),
inline=True,
), ),
], ],
) )
admin = await ctx.guild.get_member(roleping.admin) admin = await ctx.guild.fetch_member(roleping.admin)
if not admin: if not admin:
admin = self.bot.user admin = self.bot.user
embed.set_author(name=admin.display_name, icon_url=admin.display_avatar.url) embed.set_author(name=admin.display_name, icon_url=admin.display_avatar.url)
embed.set_footer(text=f"{admin.name}#{admin.discriminator} | {admin.id}") embed.set_footer(text=f"{admin.username}#{admin.discriminator} | {admin.id}")
embeds.append(embed) embeds.append(embed)
@ -117,21 +119,23 @@ class RolepingCog(Scale):
sub_cmd_name="user", sub_cmd_name="user",
sub_cmd_description="Add a user as a bypass to a roleping", sub_cmd_description="Add a user as a bypass to a roleping",
) )
@slash_option(name="user", description="User to add", opt_type=OptionTypes.USER, required=True)
@slash_option( @slash_option(
name="rping", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True name="bypass", description="User to add", opt_type=OptionTypes.USER, required=True
)
@slash_option(
name="role", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True
) )
@check(admin_or_permissions(Permissions.MANAGE_GUILD)) @check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _roleping_bypass_user( async def _roleping_bypass_user(
self, ctx: InteractionContext, user: Member, rping: Role self, ctx: InteractionContext, bypass: Member, role: Role
) -> None: ) -> None:
roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=rping.id)) roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=role.id))
if not roleping: if not roleping:
await ctx.send(f"Roleping not configured for {rping.mention}", ephemeral=True) await ctx.send(f"Roleping not configured for {role.mention}", ephemeral=True)
return return
if user.id in roleping.bypass["users"]: if bypass.id in roleping.bypass["users"]:
await ctx.send(f"{user.mention} already in bypass", ephemeral=True) await ctx.send(f"{bypass.mention} already in bypass", ephemeral=True)
return return
if len(roleping.bypass["users"]) == 10: if len(roleping.bypass["users"]) == 10:
@ -141,18 +145,18 @@ class RolepingCog(Scale):
) )
return return
matching_role = list(filter(lambda x: x.id in roleping.bypass["roles"], user.roles)) matching_role = list(filter(lambda x: x.id in roleping.bypass["roles"], bypass.roles))
if matching_role: if matching_role:
await ctx.send( await ctx.send(
f"{user.mention} already has bypass via {matching_role[0].mention}", f"{bypass.mention} already has bypass via {matching_role[0].mention}",
ephemeral=True, ephemeral=True,
) )
return return
roleping.bypass["users"].append(user.id) roleping.bypass["users"].append(bypass.id)
roleping.save() await roleping.commit()
await ctx.send(f"{user.display_name} user bypass added for `{rping.name}`") await ctx.send(f"{bypass.display_name} user bypass added for `{role.name}`")
@slash_command( @slash_command(
name="roleping", name="roleping",
@ -160,19 +164,23 @@ class RolepingCog(Scale):
sub_cmd_name="role", sub_cmd_name="role",
description="Add a role as a bypass to roleping", description="Add a role as a bypass to roleping",
) )
@slash_option(name="role", description="Role to add", opt_type=OptionTypes.ROLE, required=True)
@slash_option( @slash_option(
name="rping", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True name="bypass", description="Role to add", opt_type=OptionTypes.ROLE, required=True
)
@slash_option(
name="role", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True
) )
@check(admin_or_permissions(Permissions.MANAGE_GUILD)) @check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _roleping_bypass_role(self, ctx: InteractionContext, role: Role, rping: Role) -> None: async def _roleping_bypass_role(
roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=rping.id)) self, ctx: InteractionContext, bypass: Role, role: Role
) -> None:
roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=role.id))
if not roleping: if not roleping:
await ctx.send(f"Roleping not configured for {rping.mention}", ephemeral=True) await ctx.send(f"Roleping not configured for {role.mention}", ephemeral=True)
return return
if role.id in roleping.bypass["roles"]: if bypass.id in roleping.bypass["roles"]:
await ctx.send(f"{role.mention} already in bypass", ephemeral=True) await ctx.send(f"{bypass.mention} already in bypass", ephemeral=True)
return return
if len(roleping.bypass["roles"]) == 10: if len(roleping.bypass["roles"]) == 10:
@ -183,9 +191,9 @@ class RolepingCog(Scale):
) )
return return
roleping.bypass["roles"].append(role.id) roleping.bypass["roles"].append(bypass.id)
roleping.save() await roleping.commit()
await ctx.send(f"{role.name} role bypass added for `{rping.name}`") await ctx.send(f"{bypass.name} role bypass added for `{role.name}`")
@slash_command( @slash_command(
name="roleping", name="roleping",
@ -196,27 +204,27 @@ class RolepingCog(Scale):
sub_cmd_description="Remove a bypass from a roleping (restoring it)", sub_cmd_description="Remove a bypass from a roleping (restoring it)",
) )
@slash_option( @slash_option(
name="user", description="User to remove", opt_type=OptionTypes.USER, required=True name="bypass", description="User to remove", opt_type=OptionTypes.USER, required=True
) )
@slash_option( @slash_option(
name="rping", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True name="role", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True
) )
@check(admin_or_permissions(Permissions.MANAGE_GUILD)) @check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _roleping_restore_user( async def _roleping_restore_user(
self, ctx: InteractionContext, user: Member, rping: Role self, ctx: InteractionContext, bypass: Member, role: Role
) -> None: ) -> None:
roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=rping.id)) roleping: Roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=role.id))
if not roleping: if not roleping:
await ctx.send(f"Roleping not configured for {rping.mention}", ephemeral=True) await ctx.send(f"Roleping not configured for {role.mention}", ephemeral=True)
return return
if user.id not in roleping.bypass["users"]: if bypass.id not in roleping.bypass.users:
await ctx.send(f"{user.mention} not in bypass", ephemeral=True) await ctx.send(f"{bypass.mention} not in bypass", ephemeral=True)
return return
roleping.bypass["users"].delete(user.id) roleping.bypass.users.remove(bypass.id)
roleping.save() await roleping.commit()
await ctx.send(f"{user.display_name} user bypass removed for `{rping.name}`") await ctx.send(f"{bypass.display_name} user bypass removed for `{role.name}`")
@slash_command( @slash_command(
name="roleping", name="roleping",
@ -225,32 +233,24 @@ class RolepingCog(Scale):
description="Remove a bypass from a roleping (restoring it)", description="Remove a bypass from a roleping (restoring it)",
) )
@slash_option( @slash_option(
name="role", description="Role to remove", opt_type=OptionTypes.ROLE, required=True name="bypass", description="Role to remove", opt_type=OptionTypes.ROLE, required=True
) )
@slash_option( @slash_option(
name="rping", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True name="role", description="Rolepinged role", opt_type=OptionTypes.ROLE, required=True
) )
@check(admin_or_permissions(manage_guild=True)) @check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _roleping_restore_role( async def _roleping_restore_role(
self, ctx: InteractionContext, role: Role, rping: Role self, ctx: InteractionContext, bypass: Role, role: Role
) -> None: ) -> None:
roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=rping.id)) roleping: Roleping = await Roleping.find_one(q(guild=ctx.guild.id, role=role.id))
if not roleping: if not roleping:
await ctx.send(f"Roleping not configured for {rping.mention}", ephemeral=True) await ctx.send(f"Roleping not configured for {role.mention}", ephemeral=True)
return return
if role.id in roleping.bypass["roles"]: if bypass.id not in roleping.bypass.roles:
await ctx.send(f"{role.mention} already in bypass", ephemeral=True) await ctx.send(f"{bypass.mention} not in bypass", ephemeral=True)
return return
if len(roleping.bypass["roles"]) == 10: roleping.bypass.roles.remove(bypass.id)
await ctx.send( await roleping.commit()
"Already have 10 roles in bypass. " await ctx.send(f"{bypass.display_name} user bypass removed for `{role.name}`")
"Please consider consolidating roles for roleping bypass",
ephemeral=True,
)
return
roleping.bypass["roles"].append(role.id)
roleping.save()
await ctx.send(f"{role.name} role bypass added for `{rping.name}`")

View file

@ -1,6 +1,8 @@
"""J.A.R.V.I.S. WarningCog.""" """J.A.R.V.I.S. WarningCog."""
from dis_snek import InteractionContext, Permissions, Snake from dis_snek import InteractionContext, Permissions, Scale
from dis_snek.client.utils.misc_utils import get_all
from dis_snek.ext.paginators import Paginator from dis_snek.ext.paginators import Paginator
from dis_snek.models.discord.embed import EmbedField
from dis_snek.models.discord.user import User from dis_snek.models.discord.user import User
from dis_snek.models.snek.application_commands import ( from dis_snek.models.snek.application_commands import (
OptionTypes, OptionTypes,
@ -9,20 +11,16 @@ from dis_snek.models.snek.application_commands import (
slash_option, slash_option,
) )
from dis_snek.models.snek.command import check from dis_snek.models.snek.command import check
from jarvis_core.db.models import Warning
from jarvis.db.models import Warning
from jarvis.utils import build_embed from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog from jarvis.utils.embeds import warning_embed
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions from jarvis.utils.permissions import admin_or_permissions
class WarningCog(CacheCog): class WarningCog(Scale):
"""J.A.R.V.I.S. WarningCog.""" """J.A.R.V.I.S. WarningCog."""
def __init__(self, bot: Snake):
super().__init__(bot)
@slash_command(name="warn", description="Warn a user") @slash_command(name="warn", description="Warn a user")
@slash_option(name="user", description="User to warn", opt_type=OptionTypes.USER, required=True) @slash_option(name="user", description="User to warn", opt_type=OptionTypes.USER, required=True)
@slash_option( @slash_option(
@ -51,23 +49,15 @@ class WarningCog(CacheCog):
await ctx.send("Duration must be < 5 days", ephemeral=True) await ctx.send("Duration must be < 5 days", ephemeral=True)
return return
await ctx.defer() await ctx.defer()
_ = Warning( await Warning(
user=user.id, user=user.id,
reason=reason, reason=reason,
admin=ctx.author.id, admin=ctx.author.id,
guild=ctx.guild.id, guild=ctx.guild.id,
duration=duration, duration=duration,
active=True, active=True,
).save() ).commit()
fields = [Field("Reason", reason, False)] embed = warning_embed(user, reason)
embed = build_embed(
title="Warning",
description=f"{user.mention} has been warned",
fields=fields,
)
embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed) await ctx.send(embed=embed)
@slash_command(name="warnings", description="Get count of user warnings") @slash_command(name="warnings", description="Get count of user warnings")
@ -86,17 +76,19 @@ class WarningCog(CacheCog):
async def _warnings(self, ctx: InteractionContext, user: User, active: bool = 1) -> None: async def _warnings(self, ctx: InteractionContext, user: User, active: bool = 1) -> None:
active = bool(active) active = bool(active)
warnings = Warning.objects( warnings = (
await Warning.find(
user=user.id, user=user.id,
guild=ctx.guild.id, guild=ctx.guild.id,
).order_by("-created_at")
active_warns = Warning.objects(user=user.id, guild=ctx.guild.id, active=True).order_by(
"-created_at"
) )
.sort("created_at", -1)
.to_list(None)
)
active_warns = get_all(warnings, active=True)
pages = [] pages = []
if active: if active:
if active_warns.count() == 0: if len(active_warns) == 0:
embed = build_embed( embed = build_embed(
title="Warnings", title="Warnings",
description=f"{warnings.count()} total | 0 currently active", description=f"{warnings.count()} total | 0 currently active",
@ -113,7 +105,7 @@ class WarningCog(CacheCog):
if admin: if admin:
admin_name = f"{admin.username}#{admin.discriminator}" admin_name = f"{admin.username}#{admin.discriminator}"
fields.append( fields.append(
Field( EmbedField(
name=warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), name=warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"),
value=f"{warn.reason}\nAdmin: {admin_name}\n\u200b", value=f"{warn.reason}\nAdmin: {admin_name}\n\u200b",
inline=False, inline=False,
@ -123,7 +115,7 @@ class WarningCog(CacheCog):
embed = build_embed( embed = build_embed(
title="Warnings", title="Warnings",
description=( description=(
f"{warnings.count()} total | {active_warns.count()} currently active" f"{len(warnings)} total | {len(active_warns)} currently active"
), ),
fields=fields[i : i + 5], fields=fields[i : i + 5],
) )
@ -140,7 +132,7 @@ class WarningCog(CacheCog):
title = "[A] " if warn.active else "[I] " title = "[A] " if warn.active else "[I] "
title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC") title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append( fields.append(
Field( EmbedField(
name=title, name=title,
value=warn.reason + "\n\u200b", value=warn.reason + "\n\u200b",
inline=False, inline=False,
@ -149,9 +141,7 @@ class WarningCog(CacheCog):
for i in range(0, len(fields), 5): for i in range(0, len(fields), 5):
embed = build_embed( embed = build_embed(
title="Warnings", title="Warnings",
description=( description=(f"{len(warnings)} total | {len(active_warns)} currently active"),
f"{warnings.count()} total | {active_warns.count()} currently active"
),
fields=fields[i : i + 5], fields=fields[i : i + 5],
) )
embed.set_author( embed.set_author(