Update modlog, member needs full re-work

This commit is contained in:
Zeva Rose 2022-02-21 14:07:09 -07:00
parent cbf6587dbf
commit 90506c136e
4 changed files with 46 additions and 386 deletions

View file

@ -1,11 +1,11 @@
"""J.A.R.V.I.S. Modlog Cogs."""
from discord.ext.commands import Bot
from dis_snek import Snake
from jarvis.cogs.modlog import command, member, message
from jarvis.cogs.modlog import command, message
def setup(bot: Bot) -> None:
def setup(bot: Snake) -> None:
"""Add modlog cogs to J.A.R.V.I.S."""
command.ModlogCommandCog(bot)
member.ModlogMemberCog(bot)
# member.ModlogMemberCog(bot)
message.ModlogMessageCog(bot)

View file

@ -1,332 +1 @@
"""J.A.R.V.I.S. ModlogMemberCog."""
import asyncio
from datetime import datetime, timedelta
import discord
from discord.ext import commands
from discord.utils import find
from jarvis.cogs.modlog.utils import get_latest_log, modlog_embed
from jarvis.config import get_config
from jarvis.db.models import Ban, Kick, Mute, Setting, Unban
from jarvis.utils import build_embed
from jarvis.utils.field import Field
class ModlogMemberCog(commands.Cog):
"""J.A.R.V.I.S. ModlogMemberCog."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.cache = []
@commands.Cog.listener()
async def on_member_ban(self, guild: discord.Guild, user: discord.User) -> None:
"""Process on_member_ban events."""
modlog = Setting.objects(guild=guild.id, setting="modlog").first()
if modlog:
channel = guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await guild.audit_logs(
limit=50,
action=discord.AuditLogAction.ban,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
ban = (
Ban.objects(
guild=guild.id,
user=user.id,
active=True,
)
.order_by("-created_at")
.first()
)
if ban:
admin = guild.get_member(ban.admin)
embed = modlog_embed(
user,
admin,
log,
"User banned",
f"{user.mention} was banned from {guild.name}",
)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_unban(self, guild: discord.Guild, user: discord.User) -> None:
"""Process on_member_unban events."""
modlog = Setting.objects(guild=guild.id, setting="modlog").first()
if modlog:
channel = guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await guild.audit_logs(
limit=50,
action=discord.AuditLogAction.unban,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
unban = (
Unban.objects(
guild=guild.id,
user=user.id,
)
.order_by("-created_at")
.first()
)
admin = guild.get_member(unban.admin)
embed = modlog_embed(
user,
admin,
log,
"User unbanned",
f"{user.mention} was unbanned from {guild.name}",
)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_remove(self, user: discord.Member) -> None:
"""Process on_member_remove events."""
modlog = Setting.objects(guild=user.guild.id, setting="modlog").first()
if modlog:
channel = user.guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await user.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.kick,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
count = 0
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
while not log:
if count == 30:
break
await asyncio.sleep(0.5)
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
count += 1
if not log:
return
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
kick = (
Kick.objects(
guild=user.guild.id,
user=user.id,
)
.order_by("-created_at")
.first()
)
if kick:
admin = user.guild.get_member(kick.admin)
embed = modlog_embed(
user,
admin,
log,
"User Kicked",
f"{user.mention} was kicked from {user.guild.name}",
)
await channel.send(embed=embed)
async def process_mute(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process mute event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
mute = (
Mute.objects(
guild=before.guild.id,
user=before.id,
active=True,
)
.order_by("-created_at")
.first()
)
if mute:
admin = before.guild.get_member(mute.admin)
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Muted",
desc=f"{before.mention} was muted",
)
async def process_unmute(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process unmute event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
mute = (
Mute.objects(
guild=before.guild.id,
user=before.id,
active=True,
)
.order_by("-created_at")
.first()
)
if mute:
admin = before.guild.get_member(mute.admin)
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Muted",
desc=f"{before.mention} was muted",
)
async def process_verify(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process verification event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Verified",
desc=f"{before.mention} was verified",
)
async def process_rolechange(
self, before: discord.Member, after: discord.Member
) -> discord.Embed:
"""Process rolechange event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
role = None
title = "User Given Role"
verb = "was given"
if len(before.roles) > len(after.roles):
title = "User Forfeited Role"
verb = "forfeited"
role = find(lambda x: x not in after.roles, before.roles)
elif len(before.roles) < len(after.roles):
role = find(lambda x: x not in before.roles, after.roles)
role_text = role.mention if role else "||`[redacted]`||"
return modlog_embed(
member=before,
admin=admin,
log=log,
title=title,
desc=f"{before.mention} {verb} role {role_text}",
)
@commands.Cog.listener()
async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:
"""Process on_member_update events.
Caches events due to double-send bug
"""
h = hash(hash(before) * hash(after))
if h not in self.cache:
self.cache.append(h)
else:
return
modlog = Setting.objects(guild=before.guild.id, setting="modlog").first()
if modlog:
channel = after.guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
embed = None
mute = Setting.objects(guild=before.guild.id, setting="mute").first()
verified = Setting.objects(guild=before.guild.id, setting="verified").first()
mute_role = None
verified_role = None
if mute:
mute_role = before.guild.get_role(mute.value)
if verified:
verified_role = before.guild.get_role(verified.value)
if mute and mute_role in after.roles and mute_role not in before.roles:
embed = await self.process_mute(before, after)
elif mute and mute_role in before.roles and mute_role not in after.roles:
embed = await self.process_unmute(before, after)
elif verified and verified_role not in before.roles and verified_role in after.roles:
embed = await self.process_verify(before, after)
elif before.nick != after.nick:
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
bname = before.nick if before.nick else before.name
aname = after.nick if after.nick else after.name
fields = [
Field(
name="Before",
value=f"{bname} ({before.name}#{before.discriminator})",
),
Field(
name="After",
value=f"{aname} ({after.name}#{after.discriminator})",
),
]
if log.user.id != before.id:
fields.append(
Field(
name="Moderator",
value=f"{log.user.mention} ({log.user.name}#{log.user.discriminator})",
)
)
if log.reason:
fields.append(
Field(name="Reason", value=log.reason, inline=False),
)
embed = build_embed(
title="User Nick Changed",
description=f"{after.mention} changed their nickname",
color="#fc9e3f",
fields=fields,
timestamp=log.created_at,
)
embed.set_author(name=f"{after.name}", icon_url=after.display_avatar.url)
embed.set_footer(text=f"{after.name}#{after.discriminator} | {after.id}")
elif len(before.roles) != len(after.roles):
# TODO: User got a new role
embed = await self.process_rolechange(before, after)
if embed:
await channel.send(embed=embed)
self.cache.remove(h)
"""JARVIS member modlog processing."""

View file

@ -1,34 +1,37 @@
"""J.A.R.V.I.S. ModlogMessageCog."""
import discord
from discord.ext import commands
from dis_snek import Scale, Snake, listen
from dis_snek.api.events.discord import MessageDelete, MessageUpdate
from dis_snek.models.discord.embed import EmbedField
from jarvis_core.db import q
from jarvis_core.db.models import Setting
from jarvis.db.models import Setting
from jarvis.utils import build_embed
from jarvis.utils.field import Field
class ModlogMessageCog(commands.Cog):
class ModlogMessageCog(Scale):
"""J.A.R.V.I.S. ModlogMessageCog."""
def __init__(self, bot: commands.Bot):
def __init__(self, bot: Snake):
self.bot = bot
@commands.Cog.listener()
async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:
@listen()
async def on_message_edit(self, event: MessageUpdate) -> None:
"""Process on_message_edit events."""
before = event.before
after = event.after
if not before.author.bot:
modlog = Setting.objects(guild=after.guild.id, setting="modlog").first()
modlog = await Setting.find_one(q(guild=after.guild.id, setting="modlog"))
if modlog:
if before.content == after.content or before.content is None:
return
channel = before.guild.get_channel(modlog.value)
fields = [
Field(
EmbedField(
"Original Message",
before.content if before.content else "N/A",
False,
),
Field(
EmbedField(
"New Message",
after.content if after.content else "N/A",
False,
@ -39,40 +42,41 @@ class ModlogMessageCog(commands.Cog):
description=f"{before.author.mention} edited a message",
fields=fields,
color="#fc9e3f",
timestamp=after.edited_at,
timestamp=after.edited_timestamp,
url=after.jump_url,
)
embed.set_author(
name=before.author.name,
name=before.author.username,
icon_url=before.author.display_avatar.url,
url=after.jump_url,
)
embed.set_footer(
text=f"{before.author.name}#{before.author.discriminator} | {before.author.id}"
text=f"{before.author.username}#{before.author.discriminator} | {before.author.id}"
)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_message_delete(self, message: discord.Message) -> None:
@listen()
async def on_message_delete(self, event: MessageDelete) -> None:
"""Process on_message_delete events."""
modlog = Setting.objects(guild=message.guild.id, setting="modlog").first()
message = event.message
modlog = await Setting.find_one(q(guild=message.guild.id, setting="modlog"))
if modlog:
fields = [Field("Original Message", message.content or "N/A", False)]
fields = [EmbedField("Original Message", message.content or "N/A", False)]
if message.attachments:
value = "\n".join([f"[{x.filename}]({x.url})" for x in message.attachments])
fields.append(
Field(
EmbedField(
name="Attachments",
value=value,
inline=False,
)
)
if message.stickers:
value = "\n".join([f"[{x.name}]({x.image_url})" for x in message.stickers])
if message.sticker_items:
value = "\n".join([f"Sticker: {x.name}" for x in message.sticker_items])
fields.append(
Field(
EmbedField(
name="Stickers",
value=value,
inline=False,
@ -82,7 +86,7 @@ class ModlogMessageCog(commands.Cog):
if message.embeds:
value = str(len(message.embeds)) + " embeds"
fields.append(
Field(
EmbedField(
name="Embeds",
value=value,
inline=False,
@ -98,11 +102,11 @@ class ModlogMessageCog(commands.Cog):
)
embed.set_author(
name=message.author.name,
name=message.author.username,
icon_url=message.author.display_avatar.url,
url=message.jump_url,
)
embed.set_footer(
text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}"
text=f"{message.author.username}#{message.author.discriminator} | {message.author.id}"
)
await channel.send(embed=embed)

View file

@ -1,31 +1,27 @@
"""J.A.R.V.I.S. Modlog Cog Utilities."""
from datetime import datetime, timedelta
from typing import List
import discord
from discord import AuditLogEntry, Member
from discord.utils import find
from dis_snek.models.discord.embed import Embed, EmbedField
from dis_snek.models.discord.guild import AuditLogEntry
from dis_snek.models.discord.user import Member
from jarvis.utils import build_embed
from jarvis.utils.field import Field
def modlog_embed(
member: discord.Member,
admin: discord.Member,
log: discord.AuditLogEntry,
member: Member,
admin: Member,
log: AuditLogEntry,
title: str,
desc: str,
) -> discord.Embed:
) -> Embed:
"""Get modlog embed."""
fields = [
Field(
EmbedField(
name="Moderator",
value=f"{admin.mention} ({admin.name}#{admin.discriminator})",
value=f"{admin.mention} ({admin.username}#{admin.discriminator})",
),
]
if log.reason:
fields.append(Field(name="Reason", value=log.reason, inline=False))
fields.append(EmbedField(name="Reason", value=log.reason, inline=False))
embed = build_embed(
title=title,
description=desc,
@ -33,15 +29,6 @@ def modlog_embed(
fields=fields,
timestamp=log.created_at,
)
embed.set_author(name=f"{member.name}", icon_url=member.display_avatar.url)
embed.set_footer(text=f"{member.name}#{member.discriminator} | {member.id}")
embed.set_author(name=f"{member.username}", icon_url=member.display_avatar.url)
embed.set_footer(text=f"{member.username}#{member.discriminator} | {member.id}")
return embed
def get_latest_log(auditlog: List[AuditLogEntry], target: Member) -> AuditLogEntry:
"""Filter AuditLog to get latest entry."""
before = datetime.utcnow() - timedelta(seconds=10)
return find(
lambda x: x.target.id == target.id and x.created_at > before,
auditlog,
)