332 lines
12 KiB
Python
332 lines
12 KiB
Python
"""JARVIS Moderation Case management."""
|
|
from typing import TYPE_CHECKING, List, Optional
|
|
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Modlog, Note, actions
|
|
from naff import Extension, InteractionContext, Permissions
|
|
from naff.ext.paginators import Paginator
|
|
from naff.models.discord.embed import Embed, EmbedField
|
|
from naff.models.discord.user import Member
|
|
from naff.models.naff.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from naff.models.naff.command import check
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
if TYPE_CHECKING:
|
|
from naff.models.discord.guild import Guild
|
|
|
|
ACTIONS_LOOKUP = {
|
|
"ban": actions.Ban,
|
|
"kick": actions.Kick,
|
|
"mute": actions.Mute,
|
|
"unban": actions.Unban,
|
|
"warning": actions.Warning,
|
|
}
|
|
|
|
|
|
class CaseCog(Extension):
|
|
"""JARVIS CaseCog."""
|
|
|
|
async def get_summary_embed(self, mod_case: Modlog, guild: "Guild") -> Embed:
|
|
"""
|
|
Get Moderation case summary embed.
|
|
|
|
Args:
|
|
mod_case: Moderation case
|
|
guild: Originating guild
|
|
"""
|
|
action_table = Table()
|
|
action_table.add_column(header="Type", justify="left", style="orange4", no_wrap=True)
|
|
action_table.add_column(header="Admin", justify="left", style="cyan", no_wrap=True)
|
|
action_table.add_column(header="Reason", justify="left", style="white")
|
|
|
|
note_table = Table()
|
|
note_table.add_column(header="Admin", justify="left", style="cyan", no_wrap=True)
|
|
note_table.add_column(header="Content", justify="left", style="white")
|
|
|
|
console = Console()
|
|
|
|
action_output = ""
|
|
action_output_extra = ""
|
|
for idx, action in enumerate(mod_case.actions):
|
|
parent_action = await ACTIONS_LOOKUP[action.action_type].find_one(q(id=action.parent))
|
|
if not parent_action:
|
|
action.orphaned = True
|
|
action_table.add_row(action.action_type.title(), "[N/A]", "[N/A]")
|
|
else:
|
|
admin = await self.bot.fetch_user(parent_action.admin)
|
|
admin_text = "[N/A]"
|
|
if admin:
|
|
admin_text = f"{admin.username}#{admin.discriminator}"
|
|
action_table.add_row(action.action_type.title(), admin_text, parent_action.reason)
|
|
with console.capture() as cap:
|
|
console.print(action_table)
|
|
|
|
tmp_output = cap.get()
|
|
if len(tmp_output) >= 800:
|
|
action_output_extra = f"... and {len(mod_case.actions[idx:])} more actions"
|
|
break
|
|
|
|
action_output = tmp_output
|
|
|
|
note_output = ""
|
|
note_output_extra = ""
|
|
notes = sorted(mod_case.notes, key=lambda x: x.created_at)
|
|
for idx, note in enumerate(notes):
|
|
admin = await self.bot.fetch_user(note.admin)
|
|
admin_text = "[N/A]"
|
|
if admin:
|
|
admin_text = f"{admin.username}#{admin.discriminator}"
|
|
note_table.add_row(admin_text, note.content)
|
|
|
|
with console.capture() as cap:
|
|
console.print(note_table)
|
|
|
|
tmp_output = cap.get()
|
|
if len(tmp_output) >= 1000:
|
|
note_output_extra = f"... and {len(notes[idx:])} more notes"
|
|
break
|
|
|
|
note_output = tmp_output
|
|
|
|
status = "Open" if mod_case.open else "Closed"
|
|
|
|
user = await self.bot.fetch_user(mod_case.user)
|
|
username = "[N/A]"
|
|
user_text = "[N/A]"
|
|
if user:
|
|
username = f"{user.username}#{user.discriminator}"
|
|
user_text = user.mention
|
|
|
|
admin = await self.bot.fetch_user(mod_case.admin)
|
|
admin_text = "[N/A]"
|
|
if admin:
|
|
admin_text = admin.mention
|
|
|
|
action_output = f"```ansi\n{action_output}\n{action_output_extra}\n```"
|
|
note_output = f"```ansi\n{note_output}\n{note_output_extra}\n```"
|
|
|
|
fields = (
|
|
EmbedField(
|
|
name="Actions", value=action_output if mod_case.actions else "No Actions Found"
|
|
),
|
|
EmbedField(name="Notes", value=note_output if mod_case.notes else "No Notes Found"),
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=f"Moderation Case [`{mod_case.nanoid}`]",
|
|
description=f"{status} case against {user_text} [**opened by {admin_text}**]",
|
|
fields=fields,
|
|
timestamp=mod_case.created_at,
|
|
)
|
|
icon_url = None
|
|
if user:
|
|
icon_url = user.avatar.url
|
|
|
|
embed.set_author(name=username, icon_url=icon_url)
|
|
embed.set_footer(text=str(mod_case.user))
|
|
|
|
await mod_case.commit()
|
|
return embed
|
|
|
|
async def get_action_embeds(self, mod_case: Modlog, guild: "Guild") -> List[Embed]:
|
|
"""
|
|
Get Moderation case action embeds.
|
|
|
|
Args:
|
|
mod_case: Moderation case
|
|
guild: Originating guild
|
|
"""
|
|
embeds = []
|
|
user = await self.bot.fetch_user(mod_case.user)
|
|
username = "[N/A]"
|
|
user_mention = "[N/A]"
|
|
avatar_url = None
|
|
if user:
|
|
username = f"{user.username}#{user.discriminator}"
|
|
avatar_url = user.avatar.url
|
|
user_mention = user.mention
|
|
|
|
for action in mod_case.actions:
|
|
if action.orphaned:
|
|
continue
|
|
parent_action = await ACTIONS_LOOKUP[action.action_type].find_one(q(id=action.parent))
|
|
if not parent_action:
|
|
action.orphaned = True
|
|
continue
|
|
|
|
admin = await self.bot.fetch_user(parent_action.admin)
|
|
admin_text = "[N/A]"
|
|
if admin:
|
|
admin_text = admin.mention
|
|
|
|
fields = (EmbedField(name=action.action_type.title(), value=parent_action.reason),)
|
|
embed = build_embed(
|
|
title="Moderation Case Action",
|
|
description=f"{admin_text} initiated an action against {user_mention}",
|
|
fields=fields,
|
|
timestamp=parent_action.created_at,
|
|
)
|
|
embed.set_author(name=username, icon_url=avatar_url)
|
|
embeds.append(embed)
|
|
|
|
await mod_case.commit()
|
|
return embeds
|
|
|
|
cases = SlashCommand(name="cases", description="Manage moderation cases")
|
|
|
|
@cases.subcommand(sub_cmd_name="list", sub_cmd_description="List moderation cases")
|
|
@slash_option(
|
|
name="user",
|
|
description="User to filter cases to",
|
|
opt_type=OptionTypes.USER,
|
|
required=False,
|
|
)
|
|
@slash_option(
|
|
name="closed",
|
|
description="Include closed cases",
|
|
opt_type=OptionTypes.BOOLEAN,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _cases_list(
|
|
self, ctx: InteractionContext, user: Optional[Member] = None, closed: bool = False
|
|
) -> None:
|
|
query = q(guild=ctx.guild.id)
|
|
if not closed:
|
|
query.update(q(open=True))
|
|
if user:
|
|
query.update(q(user=user.id))
|
|
cases = await Modlog.find(query).sort("created_at", -1).to_list(None)
|
|
|
|
if len(cases) == 0:
|
|
await ctx.send("No cases to view", ephemeral=True)
|
|
return
|
|
|
|
pages = [await self.get_summary_embed(c, ctx.guild) for c in cases]
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
await paginator.send(ctx)
|
|
|
|
case = SlashCommand(name="case", description="Manage a moderation case")
|
|
show = case.group(name="show", description="Show information about a specific case")
|
|
|
|
@show.subcommand(sub_cmd_name="summary", sub_cmd_description="Summarize a specific case")
|
|
@slash_option(name="cid", description="Case ID", opt_type=OptionTypes.STRING, required=True)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_show_summary(self, ctx: InteractionContext, cid: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, nanoid=cid))
|
|
if not case:
|
|
await ctx.send(f"Could not find case with ID {cid}", ephemeral=True)
|
|
return
|
|
|
|
embed = await self.get_summary_embed(case, ctx.guild)
|
|
await ctx.send(embed=embed)
|
|
|
|
@show.subcommand(sub_cmd_name="actions", sub_cmd_description="Get case actions")
|
|
@slash_option(name="cid", description="Case ID", opt_type=OptionTypes.STRING, required=True)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_show_actions(self, ctx: InteractionContext, cid: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, nanoid=cid))
|
|
if not case:
|
|
await ctx.send(f"Could not find case with ID {cid}", ephemeral=True)
|
|
return
|
|
|
|
pages = await self.get_action_embeds(case, ctx.guild)
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
await paginator.send(ctx)
|
|
|
|
@case.subcommand(sub_cmd_name="close", sub_cmd_description="Show a specific case")
|
|
@slash_option(name="cid", description="Case ID", opt_type=OptionTypes.STRING, required=True)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_close(self, ctx: InteractionContext, cid: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, nanoid=cid))
|
|
if not case:
|
|
await ctx.send(f"Could not find case with ID {cid}", ephemeral=True)
|
|
return
|
|
|
|
case.open = False
|
|
await case.commit()
|
|
|
|
embed = await self.get_summary_embed(case, ctx.guild)
|
|
await ctx.send(embed=embed)
|
|
|
|
@case.subcommand(sub_cmd_name="repoen", sub_cmd_description="Reopen a specific case")
|
|
@slash_option(name="cid", description="Case ID", opt_type=OptionTypes.STRING, required=True)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_reopen(self, ctx: InteractionContext, cid: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, nanoid=cid))
|
|
if not case:
|
|
await ctx.send(f"Could not find case with ID {cid}", ephemeral=True)
|
|
return
|
|
|
|
case.open = True
|
|
await case.commit()
|
|
|
|
embed = await self.get_summary_embed(case, ctx.guild)
|
|
await ctx.send(embed=embed)
|
|
|
|
@case.subcommand(sub_cmd_name="note", sub_cmd_description="Add a note to a specific case")
|
|
@slash_option(name="cid", description="Case ID", opt_type=OptionTypes.STRING, required=True)
|
|
@slash_option(
|
|
name="note", description="Note to add", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_note(self, ctx: InteractionContext, cid: str, note: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, nanoid=cid))
|
|
if not case:
|
|
await ctx.send(f"Could not find case with ID {cid}", ephemeral=True)
|
|
return
|
|
|
|
if not case.open:
|
|
await ctx.send("Case is closed, please re-open to add a new comment", ephemeral=True)
|
|
return
|
|
|
|
if len(note) > 50:
|
|
await ctx.send("Note must be <= 50 characters", ephemeral=True)
|
|
return
|
|
|
|
note = Note(admin=ctx.author.id, content=note)
|
|
|
|
case.notes.append(note)
|
|
await case.commit()
|
|
|
|
embed = await self.get_summary_embed(case, ctx.guild)
|
|
await ctx.send(embed=embed)
|
|
|
|
@case.subcommand(sub_cmd_name="new", sub_cmd_description="Open a new case")
|
|
@slash_option(name="user", description="Target user", opt_type=OptionTypes.USER, required=True)
|
|
@slash_option(
|
|
name="note", description="Note to add", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _case_new(self, ctx: InteractionContext, user: Member, note: str) -> None:
|
|
case = await Modlog.find_one(q(guild=ctx.guild.id, user=user.id, open=True))
|
|
if case:
|
|
await ctx.send(f"Case already open with ID `{case.nanoid}`", ephemeral=True)
|
|
return
|
|
|
|
if not isinstance(user, Member):
|
|
await ctx.send("User must be in this guild", ephemeral=True)
|
|
return
|
|
|
|
if len(note) > 50:
|
|
await ctx.send("Note must be <= 50 characters", ephemeral=True)
|
|
return
|
|
|
|
note = Note(admin=ctx.author.id, content=note)
|
|
|
|
case = Modlog(
|
|
user=user.id, guild=ctx.guild.id, admin=ctx.author.id, notes=[note], actions=[]
|
|
)
|
|
await case.commit()
|
|
await case.reload()
|
|
|
|
embed = await self.get_summary_embed(case, ctx.guild)
|
|
await ctx.send(embed=embed)
|