Re-add phishing sync, fix all client events, dual layer phishing check
This commit is contained in:
parent
60ab06b544
commit
526038c725
1 changed files with 307 additions and 290 deletions
597
jarvis/client.py
597
jarvis/client.py
|
@ -6,13 +6,15 @@ from datetime import datetime
|
|||
from aiohttp import ClientSession
|
||||
from dis_snek import Snake, listen
|
||||
from dis_snek.api.events.discord import MessageCreate, MessageDelete, MessageUpdate
|
||||
from dis_snek.client.utils.misc_utils import find, find_all
|
||||
from dis_snek.client.utils.misc_utils import find_all
|
||||
from dis_snek.models.discord.channel import DMChannel
|
||||
from dis_snek.models.discord.embed import EmbedField
|
||||
from dis_snek.models.discord.enums import Permissions
|
||||
from dis_snek.models.discord.message import Message
|
||||
from dis_snek.models.discord.user import Member
|
||||
from dis_snek.models.snek.context import Context, InteractionContext
|
||||
from dis_snek.models.snek.tasks import Task, TimeTrigger
|
||||
from dis_snek.models.snek.tasks.task import Task
|
||||
from dis_snek.models.snek.tasks.triggers import IntervalTrigger
|
||||
from jarvis_core.db import q
|
||||
from jarvis_core.db.models import Autopurge, Autoreact, Roleping, Setting, Warning
|
||||
from jarvis_core.filters import invites, url
|
||||
|
@ -45,9 +47,9 @@ CMD_FMT = fmt(Fore.GREEN, Format.BOLD)
|
|||
class Jarvis(Snake):
|
||||
def __init__(self, *args, **kwargs): # noqa: ANN002 ANN003
|
||||
super().__init__(*args, **kwargs)
|
||||
self.phishing_domains = set()
|
||||
self.phishing_domains = []
|
||||
|
||||
@Task.create(TimeTrigger())
|
||||
@Task.create(IntervalTrigger(days=1))
|
||||
async def _update_domains(self) -> None:
|
||||
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
|
||||
response = await session.get("https://phish.sinking.yachts/v2/recent/86415")
|
||||
|
@ -56,21 +58,23 @@ class Jarvis(Snake):
|
|||
|
||||
for update in data:
|
||||
if update["type"] == "add":
|
||||
self.phishing_domains.add(update["domain"])
|
||||
if update["domain"] not in self.phishing_domains:
|
||||
self.phishing_domains.append(update["domain"])
|
||||
elif update["type"] == "delete":
|
||||
self.phishing_domains.discard(update["domain"])
|
||||
if update["domain"] in self.phishing_domains:
|
||||
self.phishing_domains.remove(update["domain"])
|
||||
|
||||
async def _sync_domains(self) -> None:
|
||||
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
|
||||
response = await session.get("https://phish.sinking.yachts/v2/all")
|
||||
response.raise_for_status()
|
||||
self.phishing_domains = set(await response.json())
|
||||
self.phishing_domains = await response.json()
|
||||
|
||||
@listen()
|
||||
async def on_ready(self) -> None:
|
||||
"""Lepton on_ready override."""
|
||||
# await self._sync_domains()
|
||||
# self._update_domains.start()
|
||||
await self._sync_domains()
|
||||
self._update_domains.start()
|
||||
print("Logged in as {}".format(self.user)) # noqa: T001
|
||||
print("Connected to {} guild(s)".format(len(self.guilds))) # noqa: T001
|
||||
print( # noqa: T001
|
||||
|
@ -144,18 +148,301 @@ class Jarvis(Snake):
|
|||
)
|
||||
embed.set_author(name=ctx.author.username, icon_url=ctx.author.display_avatar.url)
|
||||
embed.set_footer(
|
||||
text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}"
|
||||
text=f"{ctx.author.user.username}#{ctx.author.discriminator} | {ctx.author.id}"
|
||||
)
|
||||
await channel.send(embed=embed)
|
||||
|
||||
# Events
|
||||
async def on_member_join(self, user: Member) -> None:
|
||||
"""Handle on_member_join event."""
|
||||
guild = user.guild
|
||||
unverified = await Setting.find_one(q(guild=guild.id, setting="unverified"))
|
||||
if unverified:
|
||||
role = guild.get_role(unverified.value)
|
||||
if role not in user.roles:
|
||||
await user.add_role(role, reason="User just joined and is unverified")
|
||||
|
||||
async def autopurge(self, message: Message) -> None:
|
||||
"""Handle autopurge events."""
|
||||
autopurge = await Autopurge.find_one(q(guild=message.guild.id, channel=message.channel.id))
|
||||
if autopurge:
|
||||
await message.delete(delay=autopurge.delay)
|
||||
|
||||
async def autoreact(self, message: Message) -> None:
|
||||
"""Handle autoreact events."""
|
||||
autoreact = await Autoreact.find_one(
|
||||
q(
|
||||
guild=message.guild.id,
|
||||
channel=message.channel.id,
|
||||
)
|
||||
)
|
||||
if autoreact:
|
||||
for reaction in autoreact.reactions:
|
||||
await message.add_reaction(reaction)
|
||||
|
||||
async def checks(self, message: Message) -> None:
|
||||
"""Other message checks."""
|
||||
# #tech
|
||||
# channel = find(lambda x: x.id == 599068193339736096, message._mention_ids)
|
||||
# if channel and message.author.id == 293795462752894976:
|
||||
# await channel.send(
|
||||
# content="https://cdn.discordapp.com/attachments/664621130044407838/805218508866453554/tech.gif" # noqa: E501
|
||||
# )
|
||||
content = re.sub(r"\s+", "", message.content)
|
||||
match = invites.search(content)
|
||||
setting = await Setting.find_one(q(guild=message.guild.id, setting="noinvite"))
|
||||
if not setting:
|
||||
setting = Setting(guild=message.guild.id, setting="noinvite", value=True)
|
||||
await setting.commit()
|
||||
if match:
|
||||
guild_invites = await message.guild.invites()
|
||||
guild_invites.append(message.guild.vanity_url_code)
|
||||
allowed = [x.code for x in guild_invites] + [
|
||||
"dbrand",
|
||||
"VtgZntXcnZ",
|
||||
"gPfYGbvTCE",
|
||||
]
|
||||
if match.group(1) not in allowed and setting.value:
|
||||
await message.delete()
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Sent an invite link",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [
|
||||
EmbedField(
|
||||
name="Reason",
|
||||
value="Sent an invite link",
|
||||
inline=False,
|
||||
)
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}" # noqa: E501
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def massmention(self, message: Message) -> None:
|
||||
"""Handle massmention events."""
|
||||
massmention = await Setting.find_one(
|
||||
q(
|
||||
guild=message.guild.id,
|
||||
setting="massmention",
|
||||
)
|
||||
)
|
||||
|
||||
if (
|
||||
massmention
|
||||
and massmention.value > 0 # noqa: W503
|
||||
and len(message._mention_ids + message._mention_roles) # noqa: W503
|
||||
- (1 if message.author in message._mention_ids else 0) # noqa: W503
|
||||
> massmention.value # noqa: W503
|
||||
):
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Mass Mention",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [EmbedField(name="Reason", value="Mass Mention", inline=False)]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def roleping(self, message: Message) -> None:
|
||||
"""Handle roleping events."""
|
||||
if await Roleping.collection.count_documents(q(guild=message.guild.id, active=True)) == 0:
|
||||
return
|
||||
rolepings = Roleping.find(q(guild=message.guild.id, active=True))
|
||||
|
||||
# Get all role IDs involved with message
|
||||
roles = []
|
||||
async for mention in message.mention_roles:
|
||||
roles.append(mention.id)
|
||||
async for mention in message.mention_users:
|
||||
for role in mention.roles:
|
||||
roles.append(role.id)
|
||||
|
||||
if not roles:
|
||||
return
|
||||
|
||||
# Get all roles that are rolepinged
|
||||
roleping_ids = [r.role for r in rolepings]
|
||||
|
||||
# Get roles in rolepings
|
||||
role_in_rolepings = find_all(lambda x: x in roleping_ids, roles)
|
||||
|
||||
# Check if the user has the role, so they are allowed to ping it
|
||||
user_missing_role = any(x.id not in roleping_ids for x in message.author.roles)
|
||||
|
||||
# Admins can ping whoever
|
||||
user_is_admin = message.author.has_permission(Permissions.ADMINISTRATOR)
|
||||
|
||||
# Check if user in a bypass list
|
||||
user_has_bypass = False
|
||||
async for roleping in rolepings:
|
||||
if message.author.id in roleping.bypass["users"]:
|
||||
user_has_bypass = True
|
||||
break
|
||||
if any(role.id in roleping.bypass["roles"] for role in message.author.roles):
|
||||
user_has_bypass = True
|
||||
break
|
||||
|
||||
if role_in_rolepings and user_missing_role and not user_is_admin and not user_has_bypass:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Pinged a blocked role/user with a blocked role",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [
|
||||
EmbedField(
|
||||
name="Reason",
|
||||
value="Pinged a blocked role/user with a blocked role",
|
||||
inline=False,
|
||||
)
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def phishing(self, message: Message) -> None:
|
||||
"""Check if the message contains any known phishing domains."""
|
||||
for match in url.finditer(message.content):
|
||||
if match.group("domain") in self.phishing_domains:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Phishing URL",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [
|
||||
EmbedField(name="Reason", value="Phishing URL", inline=False),
|
||||
EmbedField(name="Message ID", value=str(message.id)),
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
await message.delete()
|
||||
return True
|
||||
return False
|
||||
|
||||
async def malicious_url(self, message: Message) -> None:
|
||||
"""Check if the message contains any known phishing domains."""
|
||||
for match in url.finditer(message.content):
|
||||
async with ClientSession() as session:
|
||||
resp = await session.get(
|
||||
"https://spoopy.oceanlord.me/api/check_website", json={"website": match.string}
|
||||
)
|
||||
if resp.status != 200:
|
||||
break
|
||||
data = await resp.json()
|
||||
for item in data["processed"]["urls"].values():
|
||||
if not item["safe"]:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Unsafe URL",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
reasons = ", ".join(item["not_safe_reasons"])
|
||||
fields = [
|
||||
EmbedField(name="Reason", value=f"Unsafe URL: {reasons}", inline=False),
|
||||
EmbedField(name="Message ID", valud=str(message.id)),
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
await message.delete()
|
||||
return True
|
||||
return False
|
||||
|
||||
@listen()
|
||||
async def on_message(self, event: MessageCreate) -> None:
|
||||
"""Handle on_message event. Calls other event handlers."""
|
||||
message = event.message
|
||||
if not isinstance(message.channel, DMChannel) and not message.author.bot:
|
||||
await self.autoreact(message)
|
||||
await self.massmention(message)
|
||||
await self.roleping(message)
|
||||
await self.autopurge(message)
|
||||
await self.checks(message)
|
||||
if not await self.phishing(message):
|
||||
await self.malicious_url(message)
|
||||
|
||||
@listen()
|
||||
async def on_message_edit(self, event: MessageUpdate) -> None:
|
||||
"""Process on_message_edit events."""
|
||||
before = event.before
|
||||
after = event.after
|
||||
if not before.author.bot:
|
||||
if not after.author.bot:
|
||||
modlog = await Setting.find_one(q(guild=after.guild.id, setting="modlog"))
|
||||
if modlog:
|
||||
if before.content == after.content or before.content is None:
|
||||
if not before or before.content == after.content or before.content is None:
|
||||
return
|
||||
channel = before.guild.get_channel(modlog.value)
|
||||
fields = [
|
||||
|
@ -172,19 +459,19 @@ class Jarvis(Snake):
|
|||
]
|
||||
embed = build_embed(
|
||||
title="Message Edited",
|
||||
description=f"{before.author.mention} edited a message",
|
||||
description=f"{after.author.mention} edited a message",
|
||||
fields=fields,
|
||||
color="#fc9e3f",
|
||||
timestamp=after.edited_timestamp,
|
||||
url=after.jump_url,
|
||||
)
|
||||
embed.set_author(
|
||||
name=before.author.username,
|
||||
icon_url=before.author.display_avatar.url,
|
||||
name=after.author.username,
|
||||
icon_url=after.author.display_avatar.url,
|
||||
url=after.jump_url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{before.author.username}#{before.author.discriminator} | {before.author.id}"
|
||||
text=f"{after.author.user.username}#{after.author.discriminator} | {after.author.id}"
|
||||
)
|
||||
await channel.send(embed=embed)
|
||||
if not isinstance(after.channel, DMChannel) and not after.author.bot:
|
||||
|
@ -193,7 +480,10 @@ class Jarvis(Snake):
|
|||
await self.checks(after)
|
||||
await self.roleping(after)
|
||||
await self.checks(after)
|
||||
if not await self.phishing(after):
|
||||
await self.malicious_url(after)
|
||||
|
||||
@listen()
|
||||
async def on_message_delete(self, event: MessageDelete) -> None:
|
||||
"""Process on_message_delete events."""
|
||||
message = event.message
|
||||
|
@ -245,279 +535,6 @@ class Jarvis(Snake):
|
|||
url=message.jump_url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.username}#{message.author.discriminator} | {message.author.id}"
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await channel.send(embed=embed)
|
||||
|
||||
# Events
|
||||
async def on_member_join(self, user: Member) -> None:
|
||||
"""Handle on_member_join event."""
|
||||
guild = user.guild
|
||||
unverified = await Setting.find_one(q(guild=guild.id, setting="unverified"))
|
||||
if unverified:
|
||||
role = guild.get_role(unverified.value)
|
||||
if role not in user.roles:
|
||||
await user.add_roles(role, reason="User just joined and is unverified")
|
||||
|
||||
async def autopurge(self, message: Message) -> None:
|
||||
"""Handle autopurge events."""
|
||||
autopurge = await Autopurge.find_one(q(guild=message.guild.id, channel=message.channel.id))
|
||||
if autopurge:
|
||||
await message.delete(delay=autopurge.delay)
|
||||
|
||||
async def autoreact(self, message: Message) -> None:
|
||||
"""Handle autoreact events."""
|
||||
autoreact = await Autoreact.find_one(
|
||||
q(
|
||||
guild=message.guild.id,
|
||||
channel=message.channel.id,
|
||||
)
|
||||
)
|
||||
if autoreact:
|
||||
for reaction in autoreact.reactions:
|
||||
await message.add_reaction(reaction)
|
||||
|
||||
async def checks(self, message: Message) -> None:
|
||||
"""Other message checks."""
|
||||
# #tech
|
||||
channel = find(lambda x: x.id == 599068193339736096, message.channel_mentions)
|
||||
if channel and message.author.id == 293795462752894976:
|
||||
await channel.send(
|
||||
content="https://cdn.discordapp.com/attachments/664621130044407838/805218508866453554/tech.gif" # noqa: E501
|
||||
)
|
||||
content = re.sub(r"\s+", "", message.content)
|
||||
match = invites.search(content)
|
||||
setting = await Setting.find_one(q(guild=message.guild.id, setting="noinvite"))
|
||||
if not setting:
|
||||
setting = Setting(guild=message.guild.id, setting="noinvite", value=True)
|
||||
await setting.commit()
|
||||
if match:
|
||||
guild_invites = await message.guild.invites()
|
||||
allowed = [x.code for x in guild_invites] + [
|
||||
"dbrand",
|
||||
"VtgZntXcnZ",
|
||||
"gPfYGbvTCE",
|
||||
]
|
||||
if match.group(1) not in allowed and setting.value:
|
||||
await message.delete()
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Sent an invite link",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [
|
||||
EmbedField(
|
||||
name="Reason",
|
||||
value="Sent an invite link",
|
||||
inline=False,
|
||||
)
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.nick if message.author.nick else message.author.name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}" # noqa: E501
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def massmention(self, message: Message) -> None:
|
||||
"""Handle massmention events."""
|
||||
massmention = await Setting.find_one(
|
||||
q(
|
||||
guild=message.guild.id,
|
||||
setting="massmention",
|
||||
)
|
||||
)
|
||||
if (
|
||||
massmention
|
||||
and massmention.value > 0 # noqa: W503
|
||||
and len(message.mentions) # noqa: W503
|
||||
- (1 if message.author in message.mentions else 0) # noqa: W503
|
||||
> massmention.value # noqa: W503
|
||||
):
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Mass Mention",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [EmbedField(name="Reason", value="Mass Mention", inline=False)]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.nick if message.author.nick else message.author.name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def roleping(self, message: Message) -> None:
|
||||
"""Handle roleping events."""
|
||||
rolepings = await Roleping.find(q(guild=message.guild.id, active=True))
|
||||
|
||||
if not rolepings:
|
||||
return
|
||||
|
||||
# Get all role IDs involved with message
|
||||
roles = []
|
||||
for mention in message.role_mentions:
|
||||
roles.append(mention.id)
|
||||
for mention in message.mentions:
|
||||
for role in mention.roles:
|
||||
roles.append(role.id)
|
||||
|
||||
if not roles:
|
||||
return
|
||||
|
||||
# Get all roles that are rolepinged
|
||||
roleping_ids = [r.role for r in rolepings]
|
||||
|
||||
# Get roles in rolepings
|
||||
role_in_rolepings = find_all(lambda x: x in roleping_ids, roles)
|
||||
|
||||
# Check if the user has the role, so they are allowed to ping it
|
||||
user_missing_role = any(x.id not in roleping_ids for x in message.author.roles)
|
||||
|
||||
# Admins can ping whoever
|
||||
user_is_admin = message.author.guild_permissions.ADMINISTRATOR
|
||||
|
||||
# Check if user in a bypass list
|
||||
user_has_bypass = False
|
||||
for roleping in rolepings:
|
||||
if message.author.id in roleping.bypass["users"]:
|
||||
user_has_bypass = True
|
||||
break
|
||||
if any(role.id in roleping.bypass["roles"] for role in message.author.roles):
|
||||
user_has_bypass = True
|
||||
break
|
||||
|
||||
if role_in_rolepings and user_missing_role and not user_is_admin and not user_has_bypass:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Pinged a blocked role/user with a blocked role",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [
|
||||
EmbedField(
|
||||
name="Reason",
|
||||
value="Pinged a blocked role/user with a blocked role",
|
||||
inline=False,
|
||||
)
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.nick if message.author.nick else message.author.name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
async def phishing(self, message: Message) -> None:
|
||||
"""Check if the message contains any known phishing domains."""
|
||||
for match in url.findall(message.content):
|
||||
if match in self.phishing_domains:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Phishing URL",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
fields = [EmbedField(name="Reason", value="Phishing URL", inline=False)]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.nick if message.author.nick else message.author.name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.name}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
await message.delete()
|
||||
|
||||
async def malicious_url(self, message: Message) -> None:
|
||||
"""Check if the message contains any known phishing domains."""
|
||||
for match in url.findall(message.content):
|
||||
async with ClientSession() as session:
|
||||
resp = await session.get(
|
||||
"https://spoopy.oceanlord.me/api/check_website", json={"website": match}
|
||||
)
|
||||
if resp.status != 200:
|
||||
break
|
||||
data = await resp.json()
|
||||
for item in data["processed"]["urls"].values():
|
||||
if not item["safe"]:
|
||||
w = Warning(
|
||||
active=True,
|
||||
admin=self.user.id,
|
||||
duration=24,
|
||||
guild=message.guild.id,
|
||||
reason="Unsafe URL",
|
||||
user=message.author.id,
|
||||
)
|
||||
await w.commit()
|
||||
reasons = ", ".join(item["not_safe_reasons"])
|
||||
fields = [
|
||||
EmbedField(name="Reason", value=f"Unsafe URL: {reasons}", inline=False)
|
||||
]
|
||||
embed = build_embed(
|
||||
title="Warning",
|
||||
description=f"{message.author.mention} has been warned",
|
||||
fields=fields,
|
||||
)
|
||||
embed.set_author(
|
||||
name=message.author.display_name,
|
||||
icon_url=message.author.display_avatar.url,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=f"{message.author.user.username}#{message.author.discriminator} | {message.author.id}"
|
||||
)
|
||||
await message.channel.send(embed=embed)
|
||||
await message.delete()
|
||||
break
|
||||
|
||||
@listen()
|
||||
async def on_message(self, event: MessageCreate) -> None:
|
||||
"""Handle on_message event. Calls other event handlers."""
|
||||
message = event.message
|
||||
if not isinstance(message.channel, DMChannel) and not message.author.bot:
|
||||
await self.autoreact(message)
|
||||
await self.massmention(message)
|
||||
await self.roleping(message)
|
||||
await self.autopurge(message)
|
||||
await self.checks(message)
|
||||
await self.malicious_url(message)
|
||||
|
|
Loading…
Add table
Reference in a new issue