jarvis-bot/jarvis/cogs/modlog/member.py

333 lines
13 KiB
Python

"""J.A.R.V.I.S. ModlogMemberCog."""
import asyncio
from datetime import datetime, timedelta
import discord
from discord.ext import commands
from discord.utils import find
from jarvis.cogs.modlog.utils import get_latest_log, modlog_embed
from jarvis.config import get_config
from jarvis.db.models import Ban, Kick, Mute, Setting, Unban
from jarvis.utils import build_embed
from jarvis.utils.field import Field
class ModlogMemberCog(commands.Cog):
"""J.A.R.V.I.S. ModlogMemberCog."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.cache = []
@commands.Cog.listener()
async def on_member_ban(self, guild: discord.Guild, user: discord.User) -> None:
"""Process on_member_ban events."""
modlog = Setting.objects(guild=guild.id, setting="modlog").first()
if modlog:
channel = guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await guild.audit_logs(
limit=50,
action=discord.AuditLogAction.ban,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
ban = (
Ban.objects(
guild=guild.id,
user=user.id,
active=True,
)
.order_by("-created_at")
.first()
)
if ban:
admin = guild.get_member(ban.admin)
embed = modlog_embed(
user,
admin,
log,
"User banned",
f"{user.mention} was banned from {guild.name}",
)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_unban(self, guild: discord.Guild, user: discord.User) -> None:
"""Process on_member_unban events."""
modlog = Setting.objects(guild=guild.id, setting="modlog").first()
if modlog:
channel = guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await guild.audit_logs(
limit=50,
action=discord.AuditLogAction.unban,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
unban = (
Unban.objects(
guild=guild.id,
user=user.id,
)
.order_by("-created_at")
.first()
)
admin = guild.get_member(unban.admin)
embed = modlog_embed(
user,
admin,
log,
"User unbanned",
f"{user.mention} was unbanned from {guild.name}",
)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_remove(self, user: discord.Member) -> None:
"""Process on_member_remove events."""
modlog = Setting.objects(guild=user.guild.id, setting="modlog").first()
if modlog:
channel = user.guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await user.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.kick,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
count = 0
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
while not log:
if count == 30:
break
await asyncio.sleep(0.5)
log: discord.AuditLogEntry = get_latest_log(auditlog, user)
count += 1
if not log:
return
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
kick = (
Kick.objects(
guild=user.guild.id,
user=user.id,
)
.order_by("-created_at")
.first()
)
if kick:
admin = user.guild.get_member(kick.admin)
embed = modlog_embed(
user,
admin,
log,
"User Kicked",
f"{user.mention} was kicked from {user.guild.name}",
)
await channel.send(embed=embed)
async def process_mute(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process mute event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
mute = (
Mute.objects(
guild=before.guild.id,
user=before.id,
active=True,
)
.order_by("-created_at")
.first()
)
if mute:
admin = before.guild.get_member(mute.admin)
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Muted",
desc=f"{before.mention} was muted",
)
async def process_unmute(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process unmute event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
if admin.id == get_config().client_id:
await asyncio.sleep(3)
mute = (
Mute.objects(
guild=before.guild.id,
user=before.id,
active=True,
)
.order_by("-created_at")
.first()
)
if mute:
admin = before.guild.get_member(mute.admin)
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Muted",
desc=f"{before.mention} was muted",
)
async def process_verify(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process verification event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
return modlog_embed(
member=before,
admin=admin,
log=log,
title="User Verified",
desc=f"{before.mention} was verified",
)
async def process_rolechange(self, before: discord.Member, after: discord.Member) -> discord.Embed:
"""Process rolechange event."""
await asyncio.sleep(0.5) # Need to wait for audit log
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_role_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
admin: discord.User = log.user
role = None
title = "User Given Role"
verb = "was given"
if len(before.roles) > len(after.roles):
title = "User Forfeited Role"
verb = "forfeited"
role = find(lambda x: x not in after.roles, before.roles)
elif len(before.roles) < len(after.roles):
role = find(lambda x: x not in before.roles, after.roles)
role_text = role.mention if role else "||`[redacted]`||"
return modlog_embed(
member=before,
admin=admin,
log=log,
title=title,
desc=f"{before.mention} {verb} role {role_text}",
)
@commands.Cog.listener()
async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:
"""Process on_member_update events.
Caches events due to double-send bug
"""
h = hash(hash(before) * hash(after))
if h not in self.cache:
self.cache.append(h)
else:
return
modlog = Setting.objects(guild=before.guild.id, setting="modlog").first()
if modlog:
channel = after.guild.get_channel(modlog.value)
await asyncio.sleep(0.5) # Need to wait for audit log
embed = None
mute = Setting.objects(guild=before.guild.id, setting="mute").first()
verified = Setting.objects(guild=before.guild.id, setting="verified").first()
mute_role = None
verified_role = None
if mute:
mute_role = before.guild.get_role(mute.value)
if verified:
verified_role = before.guild.get_role(verified.value)
if mute and mute_role in after.roles and mute_role not in before.roles:
embed = await self.process_mute(before, after)
elif mute and mute_role in before.roles and mute_role not in after.roles:
embed = await self.process_unmute(before, after)
elif verified and verified_role not in before.roles and verified_role in after.roles:
embed = await self.process_verify(before, after)
elif before.nick != after.nick:
auditlog = await before.guild.audit_logs(
limit=50,
action=discord.AuditLogAction.member_update,
after=datetime.utcnow() - timedelta(seconds=15),
oldest_first=False,
).flatten()
log: discord.AuditLogEntry = get_latest_log(auditlog, before)
bname = before.nick if before.nick else before.name
aname = after.nick if after.nick else after.name
fields = [
Field(
name="Before",
value=f"{bname} ({before.name}#{before.discriminator})",
),
Field(
name="After",
value=f"{aname} ({after.name}#{after.discriminator})",
),
]
if log.user.id != before.id:
fields.append(
Field(
name="Moderator",
value=f"{log.user.mention} ({log.user.name}#{log.user.discriminator})",
)
)
if log.reason:
fields.append(
Field(name="Reason", value=log.reason, inline=False),
)
embed = build_embed(
title="User Nick Changed",
description=f"{after.mention} changed their nickname",
color="#fc9e3f",
fields=fields,
timestamp=log.created_at,
)
embed.set_author(
name=f"{after.name}",
icon_url=after.avatar_url,
)
embed.set_footer(text=f"{after.name}#{after.discriminator} | {after.id}")
elif len(before.roles) != len(after.roles):
# TODO: User got a new role
embed = await self.process_rolechange(before, after)
if embed:
await channel.send(embed=embed)
self.cache.remove(h)