185 lines
7.4 KiB
Python
185 lines
7.4 KiB
Python
"""Cog wrapper for command caching."""
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
|
|
from interactions import Client, Extension, InteractionContext
|
|
from interactions.models.discord.components import ActionRow, Button, ButtonStyle
|
|
from interactions.models.discord.embed import EmbedField
|
|
from interactions.models.discord.user import Member
|
|
from jarvis_core.db.models import Action, Ban, Kick, Modlog, Mute, Setting, Warning
|
|
from statipy.db import Stat
|
|
|
|
from jarvis.tracking import WarningMetadata
|
|
from jarvis.utils import build_embed
|
|
|
|
MODLOG_LOOKUP = {"Ban": Ban, "Kick": Kick, "Mute": Mute, "Warning": Warning}
|
|
IGNORE_COMMANDS = {
|
|
"Ban": ["bans", "unban"],
|
|
"Kick": [],
|
|
"Mute": ["unmute"],
|
|
"Warning": ["warnings"],
|
|
}
|
|
VERB_LOOKUP = {"Ban": "banned", "Kick": "kicked", "Mute": "muted", "Warning": "warned"}
|
|
|
|
|
|
class ModcaseCog(Extension):
|
|
"""Cog wrapper for moderation case logging."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
self.add_extension_postrun(self.log)
|
|
|
|
async def log(self, ctx: InteractionContext, *_args: list, **kwargs: dict) -> None: # noqa: C901
|
|
"""
|
|
Log a moderation activity in a moderation case.
|
|
|
|
Args:
|
|
ctx: Command context
|
|
|
|
"""
|
|
name = self.__name__.replace("Cog", "")
|
|
|
|
if name in MODLOG_LOOKUP and ctx.invoke_target not in IGNORE_COMMANDS[name]:
|
|
if name == "Warning":
|
|
md = WarningMetadata(
|
|
client_id=self.user.id,
|
|
client_name=self.client_name,
|
|
type="manual",
|
|
guild_id=ctx.guild.id,
|
|
guild_name=ctx.guild.name,
|
|
value=1,
|
|
)
|
|
await Stat(meta=md, name="warning").insert()
|
|
user = kwargs.pop("user", None)
|
|
if not user and not ctx.target_id:
|
|
self.logger.warning("Admin action %s missing user, exiting", name)
|
|
return
|
|
|
|
if isinstance(user, str):
|
|
user = await self.bot.fetch_user(user)
|
|
if not user:
|
|
self.logger.warning("User does not exist")
|
|
return
|
|
if ctx.target_id:
|
|
user = ctx.target
|
|
coll = MODLOG_LOOKUP.get(name, None)
|
|
if not coll:
|
|
self.logger.warning("Unsupported action %s, exiting", name)
|
|
return
|
|
|
|
action = await coll.find_one(
|
|
coll.user == user.id,
|
|
coll.guild == ctx.guild.id,
|
|
coll.active == True,
|
|
sort=[("_id", -1)],
|
|
)
|
|
if not action:
|
|
self.logger.warning("Missing action %s, exiting", name)
|
|
return
|
|
|
|
notify = await Setting.find_one(
|
|
Setting.guild == ctx.guild.id,
|
|
Setting.setting == "notify",
|
|
Setting.value == True,
|
|
)
|
|
if notify and name not in (
|
|
"Kick",
|
|
"Ban",
|
|
): # Ignore Kick and Ban, as these are unique
|
|
fields = (
|
|
EmbedField(name="Action Type", value=name, inline=False),
|
|
EmbedField(
|
|
name="Reason",
|
|
value=kwargs.get("reason", None) or "N/A",
|
|
inline=False,
|
|
),
|
|
)
|
|
embed = build_embed(
|
|
title="Admin action taken",
|
|
description=f"Admin action has been taken against you in {ctx.guild.name}",
|
|
fields=fields,
|
|
)
|
|
if name == "Mute":
|
|
mts = int(user.communication_disabled_until.timestamp())
|
|
embed.add_field(name="Muted Until", value=f"<t:{mts}:F> (<t:{mts}:R>)")
|
|
guild_url = f"https://discord.com/channels/{ctx.guild.id}"
|
|
embed.set_author(name=ctx.guild.name, icon_url=ctx.guild.icon.url, url=guild_url)
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
try:
|
|
await user.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.debug("User not warned of action due to closed DMs")
|
|
|
|
modlog = await Modlog.find_one(
|
|
Modlog.user == user.id,
|
|
Modlog.guild == ctx.guild.id,
|
|
Modlog.open == True,
|
|
)
|
|
|
|
if modlog:
|
|
m_action = Action(action_type=name.lower(), parent=action.id)
|
|
modlog.actions.append(m_action)
|
|
await modlog.save()
|
|
return
|
|
|
|
modlog = await Setting.find_one(Setting.guild == ctx.guild.id, Setting.setting == "modlog")
|
|
if not modlog:
|
|
return
|
|
|
|
channel = await ctx.guild.fetch_channel(modlog.value)
|
|
if channel:
|
|
fields = (
|
|
EmbedField(name="Action Type", value=name, inline=False),
|
|
EmbedField(
|
|
name="Reason",
|
|
value=kwargs.get("reason", None) or "N/A",
|
|
inline=False,
|
|
),
|
|
EmbedField(name="Admin", value=ctx.author.mention, inline=False),
|
|
)
|
|
embed = build_embed(
|
|
title="Admin action taken",
|
|
description=f"Admin action has been taken against {user.mention}",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(name=f"{user.username}", icon_url=user.display_avatar.url)
|
|
embed.set_footer(text=f"User ID: {user.id}")
|
|
if name == "Mute":
|
|
mts = int(user.communication_disabled_until.timestamp())
|
|
embed.add_field(name="Muted Until", value=f"<t:{mts}:F> (<t:{mts}:R>)")
|
|
await channel.send(embeds=embed)
|
|
|
|
lookup_key = f"{user.id}|{ctx.guild.id}"
|
|
|
|
async with self.bot.redis.lock("lock|" + lookup_key):
|
|
if await self.bot.redis.get(lookup_key):
|
|
self.logger.debug(f"User {user.id} in {ctx.guild.id} already has pending case")
|
|
return
|
|
|
|
channel = await ctx.guild.fetch_channel(modlog.value)
|
|
if not channel:
|
|
self.logger.warn(f"Guild {ctx.guild.id} modlog channel no longer exists, deleting")
|
|
await modlog.delete()
|
|
return
|
|
|
|
embed = build_embed(
|
|
title="Recent Action Taken",
|
|
description=f"Would you like to open a moderation case for {user.mention}?",
|
|
fields=[],
|
|
)
|
|
avatar_url = user.avatar.url
|
|
if isinstance(user, Member):
|
|
avatar_url = user.display_avatar.url
|
|
embed.set_author(name=user.username, icon_url=avatar_url)
|
|
components = [
|
|
ActionRow(
|
|
Button(style=ButtonStyle.RED, emoji="✖️", custom_id="modcase|no"),
|
|
Button(style=ButtonStyle.GREEN, emoji="✔️", custom_id="modcase|yes"),
|
|
)
|
|
]
|
|
message = await channel.send(embeds=embed, components=components)
|
|
|
|
await self.bot.redis.set(lookup_key, f"{name.lower()}|{action.id}", ex=timedelta(days=7))
|
|
await self.bot.redis.set(f"msg|{message.id}", user.id, ex=timedelta(days=7))
|