210 lines
8 KiB
Python
210 lines
8 KiB
Python
"""J.A.R.V.I.S. Message event handler."""
|
|
import re
|
|
|
|
from discord import DMChannel, Message
|
|
from discord.ext.commands import Bot
|
|
from discord.utils import find
|
|
|
|
from jarvis.config import get_config
|
|
from jarvis.db.models import Autopurge, Autoreact, Roleping, Setting, Warning
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.field import Field
|
|
|
|
invites = re.compile(
|
|
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
|
|
class MessageEventHandler(object):
|
|
"""J.A.R.V.I.S. Message event handler."""
|
|
|
|
def __init__(self, bot: Bot):
|
|
self.bot = bot
|
|
self.bot.add_listener(self.on_message)
|
|
self.bot.add_listener(self.on_message_edit)
|
|
|
|
async def autopurge(self, message: Message) -> None:
|
|
"""Handle autopurge events."""
|
|
autopurge = Autopurge.objects(guild=message.guild.id, channel=message.channel.id).first()
|
|
if autopurge:
|
|
await message.delete(delay=autopurge.delay)
|
|
|
|
async def autoreact(self, message: Message) -> None:
|
|
"""Handle autoreact events."""
|
|
autoreact = Autoreact.objects(
|
|
guild=message.guild.id,
|
|
channel=message.channel.id,
|
|
).first()
|
|
if autoreact:
|
|
for reaction in autoreact.reactions:
|
|
await message.add_reaction(reaction)
|
|
|
|
async def checks(self, message: Message) -> None:
|
|
"""Other message checks."""
|
|
# #tech
|
|
channel = find(lambda x: x.id == 599068193339736096, message.channel_mentions)
|
|
if channel and message.author.id == 293795462752894976:
|
|
await channel.send(
|
|
content="https://cdn.discordapp.com/attachments/664621130044407838/805218508866453554/tech.gif"
|
|
)
|
|
content = re.sub(r"\s+", "", message.content)
|
|
match = invites.search(content)
|
|
setting = Setting.objects(guild=message.guild.id, setting="noinvite").first()
|
|
if not setting:
|
|
setting = Setting(guild=message.guild.id, setting="noinvite", value=True)
|
|
setting.save()
|
|
if match:
|
|
guild_invites = await message.guild.invites()
|
|
allowed = [x.code for x in guild_invites] + [
|
|
"dbrand",
|
|
"VtgZntXcnZ",
|
|
"gPfYGbvTCE",
|
|
]
|
|
if match.group(1) not in allowed and setting.value:
|
|
await message.delete()
|
|
_ = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Sent an invite link",
|
|
user=message.author.id,
|
|
).save()
|
|
fields = [
|
|
Field(
|
|
"Reason",
|
|
"Sent an invite link",
|
|
False,
|
|
)
|
|
]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick if message.author.nick else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}")
|
|
await message.channel.send(embed=embed)
|
|
|
|
async def massmention(self, message: Message) -> None:
|
|
"""Handle massmention events."""
|
|
massmention = Setting.objects(
|
|
guild=message.guild.id,
|
|
setting="massmention",
|
|
).first()
|
|
if (
|
|
massmention
|
|
and massmention.value > 0 # noqa: W503
|
|
and len(message.mentions) - (1 if message.author in message.mentions else 0) # noqa: W503
|
|
> massmention.value # noqa: W503
|
|
):
|
|
_ = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Mass Mention",
|
|
user=message.author.id,
|
|
).save()
|
|
fields = [Field("Reason", "Mass Mention", False)]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick if message.author.nick else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}")
|
|
await message.channel.send(embed=embed)
|
|
|
|
async def roleping(self, message: Message) -> None:
|
|
"""Handle roleping events."""
|
|
rolepings = Roleping.objects(guild=message.guild.id, active=True)
|
|
|
|
if not rolepings:
|
|
return
|
|
|
|
# Get all role IDs involved with message
|
|
roles = []
|
|
for mention in message.role_mentions:
|
|
roles.append(mention.id)
|
|
for mention in message.mentions:
|
|
for role in mention.roles:
|
|
roles.append(role.id)
|
|
|
|
if not roles:
|
|
return
|
|
|
|
# Get all roles that are rolepinged
|
|
roleping_ids = [r.role for r in rolepings]
|
|
|
|
# Get roles in rolepings
|
|
role_in_rolepings = list(filter(lambda x: x in roleping_ids, roles))
|
|
|
|
# Check if the user has the role, so they are allowed to ping it
|
|
user_missing_role = any(x.id not in roleping_ids for x in message.author.roles)
|
|
|
|
# Admins can ping whoever
|
|
user_is_admin = message.author.guild_permissions.administrator
|
|
|
|
# Check if user in a bypass list
|
|
user_has_bypass = False
|
|
for roleping in rolepings:
|
|
if message.author.id in roleping.bypass["users"]:
|
|
user_has_bypass = True
|
|
break
|
|
if any(role.id in roleping.bypass["roles"] for role in message.author.roles):
|
|
user_has_bypass = True
|
|
break
|
|
|
|
if role_in_rolepings and user_missing_role and not user_is_admin and not user_has_bypass:
|
|
_ = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Pinged a blocked role/user with a blocked role",
|
|
user=message.author.id,
|
|
).save()
|
|
fields = [
|
|
Field(
|
|
"Reason",
|
|
"Pinged a blocked role/user with a blocked role",
|
|
False,
|
|
)
|
|
]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick if message.author.nick else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}")
|
|
await message.channel.send(embed=embed)
|
|
|
|
async def on_message(self, message: Message) -> None:
|
|
"""Handle on_message event. Calls other event handlers."""
|
|
if not isinstance(message.channel, DMChannel) and not message.author.bot:
|
|
await self.autoreact(message)
|
|
await self.massmention(message)
|
|
await self.roleping(message)
|
|
await self.autopurge(message)
|
|
await self.checks(message)
|
|
|
|
async def on_message_edit(self, before: Message, after: Message) -> None:
|
|
"""Handle on_message_edit event. Calls other event handlers."""
|
|
if not isinstance(after.channel, DMChannel) and not after.author.bot:
|
|
await self.massmention(after)
|
|
await self.roleping(after)
|
|
await self.checks(after)
|
|
await self.roleping(after)
|
|
await self.checks(after)
|