434 lines
18 KiB
Python
434 lines
18 KiB
Python
"""JARVIS message event mixin"""
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from aiohttp import ClientSession
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Autopurge, Autoreact, Roleping, Setting, Warning
|
|
from jarvis_core.filters import invites, url
|
|
from naff import listen
|
|
from naff.api.events.discord import MessageCreate, MessageDelete, MessageUpdate
|
|
from naff.client.utils.misc_utils import find_all
|
|
from naff.models.discord.channel import DMChannel
|
|
from naff.models.discord.embed import EmbedField
|
|
from naff.models.discord.enums import Permissions
|
|
from naff.models.discord.message import Message
|
|
|
|
from jarvis.tracking import malicious_tracker, warnings_tracker
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.embeds import warning_embed
|
|
|
|
|
|
class MessageEventMixin:
|
|
# Message
|
|
async def autopurge(self, message: Message) -> None:
|
|
"""Handle autopurge events."""
|
|
autopurge = await Autopurge.find_one(q(guild=message.guild.id, channel=message.channel.id))
|
|
if autopurge:
|
|
if not message.author.has_permission(Permissions.ADMINISTRATOR):
|
|
self.logger.debug(
|
|
f"Autopurging message {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
await message.delete(delay=autopurge.delay)
|
|
|
|
async def autoreact(self, message: Message) -> None:
|
|
"""Handle autoreact events."""
|
|
autoreact = await Autoreact.find_one(
|
|
q(
|
|
guild=message.guild.id,
|
|
channel=message.channel.id,
|
|
)
|
|
)
|
|
if autoreact:
|
|
self.logger.debug(
|
|
f"Autoreacting to message {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
for reaction in autoreact.reactions:
|
|
await message.add_reaction(reaction)
|
|
if autoreact.thread:
|
|
name = message.content.replace("\n", " ")
|
|
name = re.sub(r"<:\w+:(\d+)>", "", name)
|
|
if len(name) > 100:
|
|
name = name[:97] + "..."
|
|
await message.create_thread(name=message.content, reason="Autoreact")
|
|
|
|
async def checks(self, message: Message) -> None:
|
|
"""Other message checks."""
|
|
# #tech
|
|
# channel = find(lambda x: x.id == 599068193339736096, message._mention_ids)
|
|
# if channel and message.author.id == 293795462752894976:
|
|
# await channel.send(
|
|
# content="https://cdn.discordapp.com/attachments/664621130044407838/805218508866453554/tech.gif" # noqa: E501
|
|
# )
|
|
content = re.sub(r"\s+", "", message.content)
|
|
match = invites.search(content)
|
|
setting = await Setting.find_one(q(guild=message.guild.id, setting="noinvite"))
|
|
if not setting:
|
|
setting = Setting(guild=message.guild.id, setting="noinvite", value=True)
|
|
await setting.commit()
|
|
if match:
|
|
guild_invites = [x.code for x in await message.guild.fetch_invites()]
|
|
if message.guild.vanity_url_code:
|
|
guild_invites.append(message.guild.vanity_url_code)
|
|
allowed = guild_invites + [
|
|
"dbrand",
|
|
"VtgZntXcnZ",
|
|
"gPfYGbvTCE",
|
|
]
|
|
if (m := match.group(1)) not in allowed and setting.value:
|
|
self.logger.debug(f"Removing non-allowed invite `{m}` from {message.guild.id}")
|
|
try:
|
|
await message.delete()
|
|
except Exception:
|
|
self.logger.debug("Message deleted before action taken")
|
|
|
|
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
|
await Warning(
|
|
active=True,
|
|
admin=self.user.id,
|
|
duration=24,
|
|
expires_at=expires_at,
|
|
guild=message.guild.id,
|
|
reason="Sent an invite link",
|
|
user=message.author.id,
|
|
).commit()
|
|
tracker = warnings_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
embed = warning_embed(message.author, "Sent an invite link")
|
|
try:
|
|
await message.channel.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.warn("Failed to send warning embed")
|
|
|
|
async def massmention(self, message: Message) -> None:
|
|
"""Handle massmention events."""
|
|
massmention = await Setting.find_one(
|
|
q(
|
|
guild=message.guild.id,
|
|
setting="massmention",
|
|
)
|
|
)
|
|
|
|
if (
|
|
massmention
|
|
and massmention.value > 0 # noqa: W503
|
|
and len(message._mention_ids + message._mention_roles) # noqa: W503
|
|
- (1 if message.author.id in message._mention_ids else 0) # noqa: W503
|
|
> massmention.value # noqa: W503
|
|
):
|
|
self.logger.debug(
|
|
f"Massmention threshold on {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
|
await Warning(
|
|
active=True,
|
|
admin=self.user.id,
|
|
duration=24,
|
|
expires_at=expires_at,
|
|
guild=message.guild.id,
|
|
reason="Mass Mention",
|
|
user=message.author.id,
|
|
).commit()
|
|
tracker = warnings_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
embed = warning_embed(message.author, "Mass Mention")
|
|
try:
|
|
await message.channel.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.warn("Failed to send warning embed")
|
|
|
|
async def roleping(self, message: Message) -> None:
|
|
"""Handle roleping events."""
|
|
try:
|
|
if message.author.has_permission(Permissions.MANAGE_GUILD):
|
|
return
|
|
except Exception as e:
|
|
self.logger.error("Failed to get permissions, pretending check failed", exc_info=e)
|
|
if await Roleping.collection.count_documents(q(guild=message.guild.id, active=True)) == 0:
|
|
return
|
|
rolepings = await Roleping.find(q(guild=message.guild.id, active=True)).to_list(None)
|
|
|
|
# Get all role IDs involved with message
|
|
roles = [x.id async for x in message.mention_roles]
|
|
async for mention in message.mention_users:
|
|
roles += [x.id for x in mention.roles]
|
|
|
|
if not roles:
|
|
return
|
|
|
|
# Get all roles that are rolepinged
|
|
roleping_ids = [r.role for r in rolepings]
|
|
|
|
# Get roles in rolepings
|
|
role_in_rolepings = find_all(lambda x: x in roleping_ids, roles)
|
|
|
|
# Check if the user has the role, so they are allowed to ping it
|
|
user_missing_role = any(x.id not in roleping_ids for x in message.author.roles)
|
|
|
|
# Admins can ping whoever
|
|
user_is_admin = message.author.has_permission(Permissions.ADMINISTRATOR)
|
|
|
|
# Check if user in a bypass list
|
|
def check_has_role(roleping: Roleping) -> bool:
|
|
return any(role.id in roleping.bypass["roles"] for role in message.author.roles)
|
|
|
|
user_has_bypass = False
|
|
for roleping in rolepings:
|
|
if message.author.id in roleping.bypass["users"]:
|
|
user_has_bypass = True
|
|
break
|
|
if check_has_role(roleping):
|
|
user_has_bypass = True
|
|
break
|
|
|
|
if role_in_rolepings and user_missing_role and not user_is_admin and not user_has_bypass:
|
|
self.logger.debug(
|
|
f"Rolepinged role in {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
|
await Warning(
|
|
active=True,
|
|
admin=self.user.id,
|
|
duration=24,
|
|
expires_at=expires_at,
|
|
guild=message.guild.id,
|
|
reason="Pinged a blocked role/user with a blocked role",
|
|
user=message.author.id,
|
|
).commit()
|
|
tracker = warnings_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
embed = warning_embed(message.author, "Pinged a blocked role/user with a blocked role")
|
|
try:
|
|
await message.channel.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.warn("Failed to send warning embed")
|
|
|
|
async def phishing(self, message: Message) -> None:
|
|
"""Check if the message contains any known phishing domains."""
|
|
for match in url.finditer(message.content):
|
|
if (m := match.group("domain")) in self.phishing_domains:
|
|
self.logger.debug(
|
|
f"Phishing url `{m}` detected in {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
|
await Warning(
|
|
active=True,
|
|
admin=self.user.id,
|
|
duration=24,
|
|
expires_at=expires_at,
|
|
guild=message.guild.id,
|
|
reason="Phishing URL",
|
|
user=message.author.id,
|
|
).commit()
|
|
tracker = warnings_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
embed = warning_embed(message.author, "Phishing URL")
|
|
try:
|
|
await message.channel.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.warn("Failed to send warning embed")
|
|
try:
|
|
await message.delete()
|
|
except Exception:
|
|
self.logger.warn("Failed to delete malicious message")
|
|
tracker = malicious_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
return True
|
|
return False
|
|
|
|
async def malicious_url(self, message: Message) -> None:
|
|
"""Check if the message contains any known phishing domains."""
|
|
for match in url.finditer(message.content):
|
|
async with ClientSession() as session:
|
|
resp = await session.post(
|
|
"https://anti-fish.bitflow.dev/check",
|
|
json={"message": match.string},
|
|
headers={
|
|
"Application-Name": "JARVIS",
|
|
"Application-Link": "https://git.zevaryx.com/stark-industries/jarvis",
|
|
},
|
|
)
|
|
if resp.status != 200:
|
|
break
|
|
data = await resp.json()
|
|
if data["match"]:
|
|
self.logger.debug(
|
|
f"Scam url `{match.string}` detected in {message.guild.id}/{message.channel.id}/{message.id}"
|
|
)
|
|
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
|
|
await Warning(
|
|
active=True,
|
|
admin=self.user.id,
|
|
duration=24,
|
|
expires_at=expires_at,
|
|
guild=message.guild.id,
|
|
reason="Unsafe URL",
|
|
user=message.author.id,
|
|
).commit()
|
|
tracker = warnings_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
reasons = ", ".join(f"{m['source']}: {m['type']}" for m in data["matches"])
|
|
embed = warning_embed(message.author, reasons)
|
|
try:
|
|
await message.channel.send(embeds=embed)
|
|
except Exception:
|
|
self.logger.warn("Failed to send warning embed")
|
|
try:
|
|
await message.delete()
|
|
except Exception:
|
|
self.logger.warn("Failed to delete malicious message")
|
|
tracker = malicious_tracker.labels(
|
|
guild_id=message.guild.id, guild_name=message.guild.name
|
|
)
|
|
tracker.inc()
|
|
return True
|
|
return False
|
|
|
|
@listen()
|
|
async def on_message(self, event: MessageCreate) -> None:
|
|
"""Handle on_message event. Calls other event handlers."""
|
|
message = event.message
|
|
if not isinstance(message.channel, DMChannel) and not message.author.bot:
|
|
await self.autoreact(message)
|
|
await self.massmention(message)
|
|
await self.roleping(message)
|
|
await self.autopurge(message)
|
|
await self.checks(message)
|
|
if not await self.phishing(message):
|
|
await self.malicious_url(message)
|
|
|
|
@listen()
|
|
async def on_message_edit(self, event: MessageUpdate) -> None:
|
|
"""Process on_message_edit events."""
|
|
before = event.before
|
|
after = event.after
|
|
if not after.author.bot:
|
|
modlog = await Setting.find_one(q(guild=after.guild.id, setting="activitylog"))
|
|
ignore = await Setting.find_one(q(guild=after.guild.id, setting="log_ignore"))
|
|
if modlog and (not ignore or (ignore and after.channel.id not in ignore.value)):
|
|
if not before or before.content == after.content or before.content is None:
|
|
return
|
|
try:
|
|
channel = before.guild.get_channel(modlog.value)
|
|
fields = [
|
|
EmbedField(
|
|
"Original Message",
|
|
before.content if before.content else "N/A",
|
|
False,
|
|
),
|
|
EmbedField(
|
|
"New Message",
|
|
after.content if after.content else "N/A",
|
|
False,
|
|
),
|
|
]
|
|
embed = build_embed(
|
|
title="Message Edited",
|
|
description=f"{after.author.mention} edited a message in {before.channel.mention}",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
timestamp=after.edited_timestamp,
|
|
url=after.jump_url,
|
|
)
|
|
embed.set_author(
|
|
name=after.author.username,
|
|
icon_url=after.author.display_avatar.url,
|
|
url=after.jump_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{after.author.username}#{after.author.discriminator} | {after.author.id}"
|
|
)
|
|
await channel.send(embeds=embed)
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Failed to process edit {before.guild.id}/{before.channel.id}/{before.id}: {e}"
|
|
)
|
|
if not isinstance(after.channel, DMChannel) and not after.author.bot:
|
|
await self.massmention(after)
|
|
await self.roleping(after)
|
|
await self.checks(after)
|
|
await self.roleping(after)
|
|
await self.checks(after)
|
|
if not await self.phishing(after):
|
|
await self.malicious_url(after)
|
|
|
|
@listen()
|
|
async def on_message_delete(self, event: MessageDelete) -> None:
|
|
"""Process on_message_delete events."""
|
|
message = event.message
|
|
modlog = await Setting.find_one(q(guild=message.guild.id, setting="activitylog"))
|
|
ignore = await Setting.find_one(q(guild=message.guild.id, setting="log_ignore"))
|
|
if modlog and (not ignore or (ignore and message.channel.id not in ignore.value)):
|
|
try:
|
|
content = message.content or "N/A"
|
|
except AttributeError:
|
|
content = "N/A"
|
|
fields = [EmbedField("Original Message", content, False)]
|
|
|
|
try:
|
|
if message.attachments:
|
|
value = "\n".join([f"[{x.filename}]({x.url})" for x in message.attachments])
|
|
fields.append(
|
|
EmbedField(
|
|
name="Attachments",
|
|
value=value,
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
if message.sticker_items:
|
|
value = "\n".join([f"Sticker: {x.name}" for x in message.sticker_items])
|
|
fields.append(
|
|
EmbedField(
|
|
name="Stickers",
|
|
value=value,
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
if message.embeds:
|
|
value = str(len(message.embeds)) + " embeds"
|
|
fields.append(
|
|
EmbedField(
|
|
name="Embeds",
|
|
value=value,
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
channel = message.guild.get_channel(modlog.value)
|
|
embed = build_embed(
|
|
title="Message Deleted",
|
|
description=f"{message.author.mention}'s message was deleted from {message.channel.mention}",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
)
|
|
|
|
embed.set_author(
|
|
name=message.author.username,
|
|
icon_url=message.author.display_avatar.url,
|
|
url=message.jump_url,
|
|
)
|
|
embed.set_footer(
|
|
text=(
|
|
f"{message.author.username}#{message.author.discriminator} | "
|
|
f"{message.author.id}"
|
|
)
|
|
)
|
|
await channel.send(embeds=embed)
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Failed to process edit {message.guild.id}/{message.channel.id}/{message.id}: {e}"
|
|
)
|