474 lines
18 KiB
Python
474 lines
18 KiB
Python
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
import discord
|
|
import pymongo
|
|
from discord import DMChannel
|
|
from discord.ext import commands
|
|
from discord.utils import find
|
|
from discord_slash import SlashContext
|
|
|
|
import jarvis
|
|
from jarvis.config import get_config
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.db import DBManager
|
|
from jarvis.utils.field import Field
|
|
|
|
|
|
class ModlogCog(commands.Cog):
|
|
"""
|
|
A hybrid user/modlog functionality for J.A.R.V.I.S.
|
|
"""
|
|
|
|
def __init__(self, bot: discord.ext.commands.Bot):
|
|
self.bot = bot
|
|
self.db = DBManager(get_config().mongo).mongo
|
|
|
|
def get_latest_log(self, auditlog, target):
|
|
before = datetime.utcnow() - timedelta(seconds=10)
|
|
return find(
|
|
lambda x: x.target.id == target.id and x.created_at > before,
|
|
auditlog,
|
|
)
|
|
|
|
async def modlog_embed(
|
|
self,
|
|
member: discord.Member,
|
|
admin: discord.Member,
|
|
log: discord.AuditLogEntry,
|
|
title: str,
|
|
desc: str,
|
|
) -> discord.Embed:
|
|
fields = [
|
|
Field(
|
|
name="Moderator",
|
|
value=f"{admin.mention} ({admin.name}"
|
|
+ f"#{admin.discriminator})",
|
|
),
|
|
]
|
|
if log.reason:
|
|
fields.append(Field(name="Reason", value=log.reason, inline=False))
|
|
embed = build_embed(
|
|
title=title,
|
|
description=desc,
|
|
color="#fc9e3f",
|
|
fields=fields,
|
|
timestamp=log.created_at,
|
|
)
|
|
embed.set_author(
|
|
name=f"{member.name}",
|
|
icon_url=member.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{member.name}#{member.discriminator} | {member.id}"
|
|
)
|
|
return embed
|
|
|
|
@commands.Cog.listener()
|
|
async def on_member_ban(self, guild: discord.Guild, user: discord.User):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
channel = guild.get_channel(modlog["value"])
|
|
await asyncio.sleep(0.5) # Need to wait for audit log
|
|
auditlog = await guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.ban,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
)
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, user)
|
|
admin: discord.User = log.user
|
|
if admin.id == get_config().client_id:
|
|
mute = self.db.jarvis.bans.find_one(
|
|
{
|
|
"guild": guild.id,
|
|
"user": user.id,
|
|
"active": True,
|
|
},
|
|
sort=[("time", pymongo.DESCENDING)],
|
|
)
|
|
admin = guild.get_member(mute["admin"])
|
|
embed = await self.modlog_embed(
|
|
user,
|
|
admin,
|
|
log,
|
|
"User banned",
|
|
f"{user.mention} was banned from {guild.name}",
|
|
)
|
|
|
|
await channel.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_member_unban(self, guild: discord.Guild, user: discord.User):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
channel = guild.get_channel(modlog["value"])
|
|
await asyncio.sleep(0.5) # Need to wait for audit log
|
|
auditlog = await guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.unban,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
)
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, user)
|
|
admin: discord.User = log.user
|
|
if admin.id == get_config().client_id:
|
|
mute = self.db.jarvis.bans.find_one(
|
|
{
|
|
"guild": guild.id,
|
|
"user": user.id,
|
|
"active": True,
|
|
},
|
|
sort=[("time", pymongo.DESCENDING)],
|
|
)
|
|
admin = guild.get_member(mute["admin"])
|
|
embed = await self.modlog_embed(
|
|
user,
|
|
admin,
|
|
log,
|
|
"User unbanned",
|
|
f"{user.mention} was unbanned from {guild.name}",
|
|
)
|
|
|
|
await channel.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_member_remove(self, user: discord.User):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": user.guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
channel = user.guild.get_channel(modlog["value"])
|
|
await asyncio.sleep(0.5) # Need to wait for audit log
|
|
auditlog = await user.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.kick,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
)
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, user)
|
|
admin: discord.User = log.user
|
|
if admin.id == get_config().client_id:
|
|
mute = self.db.jarvis.kicks.find_one(
|
|
{
|
|
"guild": user.guild.id,
|
|
"user": user.id,
|
|
},
|
|
sort=[("time", pymongo.DESCENDING)],
|
|
)
|
|
admin = user.guild.get_member(mute["admin"])
|
|
embed = await self.modlog_embed(
|
|
user,
|
|
admin,
|
|
log,
|
|
"User Kicked",
|
|
f"{user.mention} was kicked from {user.guild.name}",
|
|
)
|
|
|
|
await channel.send(embed=embed)
|
|
|
|
async def process_mute(self, before, after) -> discord.Embed:
|
|
auditlog = await before.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.member_role_update,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
).flatten()
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, before)
|
|
admin: discord.User = log.user
|
|
if admin.id == get_config().client_id:
|
|
mute = self.db.jarvis.mutes.find_one(
|
|
{
|
|
"guild": before.guild.id,
|
|
"user": before.id,
|
|
"active": True,
|
|
},
|
|
sort=[("time", pymongo.DESCENDING)],
|
|
)
|
|
admin = before.guild.get_member(mute["admin"])
|
|
return await self.modlog_embed(
|
|
member=before,
|
|
admin=admin,
|
|
log=log,
|
|
title="User Muted",
|
|
desc=f"{before.mention} was muted",
|
|
)
|
|
|
|
async def process_unmute(self, before, after) -> discord.Embed:
|
|
auditlog = await before.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.member_role_update,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
).flatten()
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, before)
|
|
admin: discord.User = log.user
|
|
if admin.id == get_config().client_id:
|
|
mute = self.db.jarvis.mutes.find_one(
|
|
{
|
|
"guild": before.guild.id,
|
|
"user": before.id,
|
|
"active": True,
|
|
},
|
|
sort=[("time", pymongo.DESCENDING)],
|
|
)
|
|
admin = before.guild.get_member(mute["admin"])
|
|
return await self.modlog_embed(
|
|
member=before,
|
|
admin=admin,
|
|
log=log,
|
|
title="User Muted",
|
|
desc=f"{before.mention} was muted",
|
|
)
|
|
|
|
async def process_verify(self, before, after) -> discord.Embed:
|
|
auditlog = await before.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.member_role_update,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
).flatten()
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, before)
|
|
admin: discord.User = log.user
|
|
return await self.modlog_embed(
|
|
member=before,
|
|
admin=admin,
|
|
log=log,
|
|
title="User Verified",
|
|
desc=f"{before.mention} was verified",
|
|
)
|
|
|
|
async def process_rolechange(self, before, after) -> discord.Embed:
|
|
auditlog = await before.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.member_role_update,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
).flatten()
|
|
log: discord.AuditLogEntry = self.get_latest_log(auditlog, before)
|
|
admin: discord.User = log.user
|
|
role = None
|
|
title = "User Given Role"
|
|
verb = "was given"
|
|
if len(before.roles) > len(after.roles):
|
|
title = "User Forfeited Role"
|
|
verb = "forfeited"
|
|
role = find(lambda x: x not in after.roles, before.roles)
|
|
elif len(before.roles) < len(after.roles):
|
|
role = find(lambda x: x not in before.roles, after.roles)
|
|
role_text = role.mention if role else "||`[redacted]`||"
|
|
return await self.modlog_embed(
|
|
member=before,
|
|
admin=admin,
|
|
log=log,
|
|
title=title,
|
|
desc=f"{before.mention} {verb} role {role_text}",
|
|
)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_member_update(
|
|
self, before: discord.User, after: discord.User
|
|
):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": after.guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
channel = after.guild.get_channel(modlog["value"])
|
|
await asyncio.sleep(0.5) # Need to wait for audit log
|
|
embed = None
|
|
mute = self.db.jarvis.settings.find_one(
|
|
{"guild": before.guild.id, "setting": "mute"}
|
|
)
|
|
verified = self.db.jarvis.settings.find_one(
|
|
{"guild": before.guild.id, "setting": "verified"}
|
|
)
|
|
if mute and before.guild.get_role(mute["value"]) in after.roles:
|
|
embed = await self.process_mute(before, after)
|
|
elif mute and before.guild.get_role(mute["value"]) in before.roles:
|
|
embed = await self.process_unmute(before, after)
|
|
elif (
|
|
verified
|
|
and before.guild.get_role(verified["value"])
|
|
not in before.roles
|
|
and after.guild.get_role(verified["value"]) in after.roles
|
|
):
|
|
embed = await self.process_verify(before, after)
|
|
elif before.nick != after.nick:
|
|
auditlog = await before.guild.audit_logs(
|
|
limit=50,
|
|
action=discord.AuditLogAction.member_update,
|
|
after=datetime.utcnow() - timedelta(seconds=15),
|
|
oldest_first=False,
|
|
).flatten()
|
|
log: discord.AuditLogEntry = self.get_latest_log(
|
|
auditlog, before
|
|
)
|
|
bname = before.nick if before.nick else before.name
|
|
aname = after.nick if after.nick else after.name
|
|
fields = [
|
|
Field(
|
|
name="Before",
|
|
value=f"{bname} ({before.name}"
|
|
+ f"#{before.discriminator})",
|
|
),
|
|
Field(
|
|
name="After",
|
|
value=f"{aname} ({after.name}"
|
|
+ f"#{after.discriminator})",
|
|
),
|
|
]
|
|
if log.user.id != before.id:
|
|
fields.append(
|
|
Field(
|
|
name="Moderator",
|
|
value=f"{log.user.mention} ({log.user.name}"
|
|
+ f"#{log.user.discriminator})",
|
|
)
|
|
)
|
|
if log.reason:
|
|
fields.append(
|
|
Field(name="Reason", value=log.reason, inline=False),
|
|
)
|
|
embed = build_embed(
|
|
title="User Nick Changed",
|
|
description=f"{after.mention} changed their nickname",
|
|
color="#fc9e3f",
|
|
fields=fields,
|
|
timestamp=log.created_at,
|
|
)
|
|
embed.set_author(
|
|
name=f"{after.name}",
|
|
icon_url=after.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{after.name}#{after.discriminator} | {after.id}"
|
|
)
|
|
elif len(before.roles) != len(after.roles):
|
|
# TODO: User got a new role
|
|
embed = await self.process_rolechange(before, after)
|
|
if embed:
|
|
await channel.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message_edit(
|
|
self, before: discord.Message, after: discord.Message
|
|
):
|
|
if before.author != self.bot.user.id:
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": after.guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
if before.content == after.content or before.content is None:
|
|
return
|
|
channel = before.guild.get_channel(modlog["value"])
|
|
fields = [
|
|
Field(
|
|
"Original Message",
|
|
before.content if before.content else "N/A",
|
|
False,
|
|
),
|
|
Field(
|
|
"New Message",
|
|
after.content if after.content else "N/A",
|
|
False,
|
|
),
|
|
]
|
|
embed = build_embed(
|
|
title="Message Edited",
|
|
description=f"{before.author.mention} edited a message",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
timestamp=after.edited_at,
|
|
url=after.jump_url,
|
|
)
|
|
embed.set_author(
|
|
name=before.author.name,
|
|
icon_url=before.author.avatar_url,
|
|
url=after.jump_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{before.author.name}#{before.author.discriminator}"
|
|
+ f" | {before.author.id}"
|
|
)
|
|
await channel.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message_delete(self, message: discord.Message):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": message.guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
fields = [Field("Original Message", message.content, False)]
|
|
channel = message.guild.get_channel(modlog["value"])
|
|
embed = build_embed(
|
|
title="Message Deleted",
|
|
description=f"{message.author.mention}'s message was deleted",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
)
|
|
embed.set_author(
|
|
name=message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
url=message.jump_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{message.author.name}#{message.author.discriminator}"
|
|
+ f" | {message.author.id}"
|
|
)
|
|
await channel.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_slash_command(self, ctx: SlashContext):
|
|
if not isinstance(ctx.channel, DMChannel):
|
|
modlog = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "modlog"}
|
|
)
|
|
if modlog:
|
|
channel = ctx.guild.get_channel(modlog["value"])
|
|
fields = [
|
|
Field("Command", ctx.name),
|
|
]
|
|
if ctx.args:
|
|
fields.append(
|
|
Field(
|
|
"Args",
|
|
" ".join(ctx.args),
|
|
False,
|
|
)
|
|
)
|
|
if ctx.kwargs:
|
|
kwargs_string = " ".join(
|
|
f"{k}: {ctx.kwargs[k]}" for k in ctx.kwargs
|
|
)
|
|
fields.append(
|
|
Field(
|
|
"Keyword Args",
|
|
kwargs_string,
|
|
False,
|
|
)
|
|
)
|
|
if ctx.subcommand_name:
|
|
fields.insert(1, Field("Subcommand", ctx.subcommand_name))
|
|
embed = build_embed(
|
|
title="Command Invoked",
|
|
description=f"{ctx.author.mention} invoked a command",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
)
|
|
embed.set_author(
|
|
name=ctx.author.name,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{ctx.author.name}#{ctx.author.discriminator}"
|
|
+ f" | {ctx.author.id}"
|
|
)
|
|
await channel.send(embed=embed)
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(ModlogCog(bot))
|