jarvis-bot/jarvis/client.py
2022-03-30 10:13:18 -06:00

543 lines
23 KiB
Python

"""Custom JARVIS client."""
import logging
import re
import traceback
from datetime import datetime, timezone
from aiohttp import ClientSession
from dis_snek import Snake, listen
from dis_snek.api.events.discord import MessageCreate, MessageDelete, MessageUpdate
from dis_snek.client.errors import CommandCheckFailure, CommandOnCooldown
from dis_snek.client.utils.misc_utils import find_all
from dis_snek.models.discord.channel import DMChannel
from dis_snek.models.discord.embed import EmbedField
from dis_snek.models.discord.enums import Permissions
from dis_snek.models.discord.message import Message
from dis_snek.models.discord.user import Member
from dis_snek.models.snek.context import Context, InteractionContext
from dis_snek.models.snek.tasks.task import Task
from dis_snek.models.snek.tasks.triggers import IntervalTrigger
from jarvis_core.db import q
from jarvis_core.db.models import Autopurge, Autoreact, Roleping, Setting, Warning
from jarvis_core.filters import invites, url
from jarvis_core.util import build_embed
from jarvis_core.util.ansi import RESET, Fore, Format, fmt
from pastypy import AsyncPaste as Paste
from jarvis.utils.embeds import warning_embed
DEFAULT_GUILD = 862402786116763668
DEFAULT_ERROR_CHANNEL = 943395824560394250
DEFAULT_SITE = "https://paste.zevs.me"
ERROR_MSG = """
Command Information:
Name: {invoked_name}
Args:
{arg_str}
Callback:
Args:
{callback_args}
Kwargs:
{callback_kwargs}
"""
KEY_FMT = fmt(Fore.GRAY)
VAL_FMT = fmt(Fore.WHITE)
CMD_FMT = fmt(Fore.GREEN, Format.BOLD)
class Jarvis(Snake):
def __init__(self, *args, **kwargs): # noqa: ANN002 ANN003
super().__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
self.phishing_domains = []
self.pre_run_callback = self._prerun
@Task.create(IntervalTrigger(days=1))
async def _update_domains(self) -> None:
self.logger.debug("Updating phishing domains")
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
response = await session.get("https://phish.sinking.yachts/v2/recent/86415")
response.raise_for_status()
data = await response.json()
self.logger.debug(f"Found {len(data)} changes to phishing domains")
for update in data:
if update["type"] == "add":
if update["domain"] not in self.phishing_domains:
self.phishing_domains.append(update["domain"])
elif update["type"] == "delete":
if update["domain"] in self.phishing_domains:
self.phishing_domains.remove(update["domain"])
async def _prerun(self, ctx: InteractionContext, *args, **kwargs) -> None:
name = ctx.invoked_name
if ctx.target:
kwargs["context target"] = ctx.target
args = " ".join(f"{k}:{v}" for k, v in kwargs.items())
self.logger.debug(f"Running command `{name}` with args: {args or 'None'}")
async def _sync_domains(self) -> None:
self.logger.debug("Loading phishing domains")
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
response = await session.get("https://phish.sinking.yachts/v2/all")
response.raise_for_status()
self.phishing_domains = await response.json()
self.logger.info(f"Protected from {len(self.phishing_domains)} phishing domains")
@listen()
async def on_ready(self) -> None:
"""Lepton on_ready override."""
await self._sync_domains()
self._update_domains.start()
self.logger.info("Logged in as {}".format(self.user)) # noqa: T001
self.logger.info("Connected to {} guild(s)".format(len(self.guilds))) # noqa: T001
self.logger.info( # noqa: T001
"https://discord.com/api/oauth2/authorize?client_id="
"{}&permissions=8&scope=bot%20applications.commands".format(self.user.id)
)
async def on_command_error(
self, ctx: Context, error: Exception, *args: list, **kwargs: dict
) -> None:
"""Lepton on_command_error override."""
self.logger.debug(f"Handling error in {ctx.invoked_name}: {error}")
if isinstance(error, CommandOnCooldown):
await ctx.send(str(error), ephemeral=True)
return
elif isinstance(error, CommandCheckFailure):
await ctx.send("I'm afraid I can't let you do that", ephemeral=True)
return
guild = await self.fetch_guild(DEFAULT_GUILD)
channel = await guild.fetch_channel(DEFAULT_ERROR_CHANNEL)
error_time = datetime.now(tz=timezone.utc).strftime("%d-%m-%Y %H:%M-%S.%f UTC")
timestamp = int(datetime.now().timestamp())
timestamp = f"<t:{timestamp}:T>"
arg_str = ""
if ctx.target:
ctx.kwargs["context target"] = ctx.target
for k, v in ctx.kwargs.items():
arg_str += f" {k}: "
if isinstance(v, str) and len(v) > 100:
v = v[97] + "..."
arg_str += f"{v}\n"
callback_args = "\n".join(f" - {i}" for i in args) if args else " None"
callback_kwargs = (
"\n".join(f" {k}: {v}" for k, v in kwargs.items()) if kwargs else " None"
)
full_message = ERROR_MSG.format(
error_time=error_time,
invoked_name=ctx.invoked_name,
arg_str=arg_str,
callback_args=callback_args,
callback_kwargs=callback_kwargs,
)
error_message = "".join(traceback.format_exception(error))
if len(full_message + error_message) >= 1800:
error_message = "\n ".join(error_message.split("\n"))
full_message += "Exception: |\n " + error_message
paste = Paste(content=full_message, site=DEFAULT_SITE)
key = await paste.save()
self.logger.debug(f"Large traceback, saved to Pasty {paste.id}, {key=}")
await channel.send(
f"JARVIS encountered an error at {timestamp}. Log too big to send over Discord."
f"\nPlease see log at {paste.url}"
)
else:
await channel.send(
f"JARVIS encountered an error at {timestamp}:"
f"\n```yaml\n{full_message}\n```"
f"\nException:\n```py\n{error_message}\n```"
)
await ctx.send("Whoops! Encountered an error. The error has been logged.", ephemeral=True)
return await super().on_command_error(ctx, error, *args, **kwargs)
# Modlog
async def on_command(self, ctx: InteractionContext) -> None:
"""Lepton on_command override."""
if not isinstance(ctx.channel, DMChannel) and ctx.invoked_name not in ["pw"]:
modlog = await Setting.find_one(q(guild=ctx.guild.id, setting="modlog"))
if modlog:
channel = await ctx.guild.fetch_channel(modlog.value)
args = []
if ctx.target:
args.append(f"{KEY_FMT}context target:{VAL_FMT}{ctx.target}{RESET}")
for k, v in ctx.kwargs.items():
if isinstance(v, str):
v = v.replace("`", "\\`")
if len(v) > 100:
v = v[:97] + "..."
args.append(f"{KEY_FMT}{k}:{VAL_FMT}{v}{RESET}")
args = " ".join(args)
fields = [
EmbedField(
name="Command",
value=f"```ansi\n{CMD_FMT}{ctx.invoked_name}{RESET} {args}\n```",
inline=False,
),
]
embed = build_embed(
title="Command Invoked",
description=f"{ctx.author.mention} invoked a command in {ctx.channel.mention}",
fields=fields,
color="#fc9e3f",
)
embed.set_author(name=ctx.author.username, icon_url=ctx.author.display_avatar.url)
embed.set_footer(
text=f"{ctx.author.user.username}#{ctx.author.discriminator} | {ctx.author.id}"
)
await channel.send(embed=embed)
# Events
async def on_member_join(self, user: Member) -> None:
"""Handle on_member_join event."""
guild = user.guild
unverified = await Setting.find_one(q(guild=guild.id, setting="unverified"))
if unverified:
self.logger.debug(f"Applying unverified role to {user.id} in {guild.id}")
role = await guild.fetch_role(unverified.value)
if role not in user.roles:
await user.add_role(role, reason="User just joined and is unverified")
async def autopurge(self, message: Message) -> None:
"""Handle autopurge events."""
autopurge = await Autopurge.find_one(q(guild=message.guild.id, channel=message.channel.id))
if autopurge:
self.logger.debug(
f"Autopurging message {message.guild.id}/{message.channel.id}/{message.id}"
)
await message.delete(delay=autopurge.delay)
async def autoreact(self, message: Message) -> None:
"""Handle autoreact events."""
autoreact = await Autoreact.find_one(
q(
guild=message.guild.id,
channel=message.channel.id,
)
)
if autoreact:
self.logger.debug(
f"Autoreacting to message {message.guild.id}/{message.channel.id}/{message.id}"
)
for reaction in autoreact.reactions:
await message.add_reaction(reaction)
if autoreact.thread:
name = message.content
if len(name) > 100:
name = name[:97] + "..."
await message.create_thread(name=message.content, reason="Autoreact")
async def checks(self, message: Message) -> None:
"""Other message checks."""
# #tech
# channel = find(lambda x: x.id == 599068193339736096, message._mention_ids)
# if channel and message.author.id == 293795462752894976:
# await channel.send(
# content="https://cdn.discordapp.com/attachments/664621130044407838/805218508866453554/tech.gif" # noqa: E501
# )
content = re.sub(r"\s+", "", message.content)
match = invites.search(content)
setting = await Setting.find_one(q(guild=message.guild.id, setting="noinvite"))
if not setting:
setting = Setting(guild=message.guild.id, setting="noinvite", value=True)
await setting.commit()
if match:
guild_invites = await message.guild.fetch_invites()
if message.guild.vanity_url_code:
guild_invites.append(message.guild.vanity_url_code)
allowed = [x.code for x in guild_invites] + [
"dbrand",
"VtgZntXcnZ",
"gPfYGbvTCE",
]
if (m := match.group(1)) not in allowed and setting.value:
self.logger.debug(f"Removing non-allowed invite `{m}` from {message.guild.id}")
try:
await message.delete()
except Exception:
self.logger.debug("Message deleted before action taken")
await Warning(
active=True,
admin=self.user.id,
duration=24,
guild=message.guild.id,
reason="Sent an invite link",
user=message.author.id,
).commit()
embed = warning_embed(message.author, "Sent an invite link")
await message.channel.send(embed=embed)
async def massmention(self, message: Message) -> None:
"""Handle massmention events."""
massmention = await Setting.find_one(
q(
guild=message.guild.id,
setting="massmention",
)
)
if (
massmention
and massmention.value > 0 # noqa: W503
and len(message._mention_ids + message._mention_roles) # noqa: W503
- (1 if message.author.id in message._mention_ids else 0) # noqa: W503
> massmention.value # noqa: W503
):
self.logger.debug(
f"Massmention threshold on {message.guild.id}/{message.channel.id}/{message.id}"
)
await Warning(
active=True,
admin=self.user.id,
duration=24,
guild=message.guild.id,
reason="Mass Mention",
user=message.author.id,
).commit()
embed = warning_embed(message.author, "Mass Mention")
await message.channel.send(embed=embed)
async def roleping(self, message: Message) -> None:
"""Handle roleping events."""
if await Roleping.collection.count_documents(q(guild=message.guild.id, active=True)) == 0:
return
rolepings = await Roleping.find(q(guild=message.guild.id, active=True)).to_list(None)
# Get all role IDs involved with message
roles = []
async for mention in message.mention_roles:
roles.append(mention.id)
async for mention in message.mention_users:
for role in mention.roles:
roles.append(role.id)
if not roles:
return
# Get all roles that are rolepinged
roleping_ids = [r.role for r in rolepings]
# Get roles in rolepings
role_in_rolepings = find_all(lambda x: x in roleping_ids, roles)
# Check if the user has the role, so they are allowed to ping it
user_missing_role = any(x.id not in roleping_ids for x in message.author.roles)
# Admins can ping whoever
user_is_admin = message.author.has_permission(Permissions.ADMINISTRATOR)
# Check if user in a bypass list
user_has_bypass = False
for roleping in rolepings:
if message.author.id in roleping.bypass["users"]:
user_has_bypass = True
break
if any(role.id in roleping.bypass["roles"] for role in message.author.roles):
user_has_bypass = True
break
if role_in_rolepings and user_missing_role and not user_is_admin and not user_has_bypass:
self.logger.debug(
f"Rolepinged role in {message.guild.id}/{message.channel.id}/{message.id}"
)
await Warning(
active=True,
admin=self.user.id,
duration=24,
guild=message.guild.id,
reason="Pinged a blocked role/user with a blocked role",
user=message.author.id,
).commit()
embed = warning_embed(message.author, "Pinged a blocked role/user with a blocked role")
await message.channel.send(embed=embed)
async def phishing(self, message: Message) -> None:
"""Check if the message contains any known phishing domains."""
for match in url.finditer(message.content):
if (m := match.group("domain")) in self.phishing_domains:
self.logger.debug(
f"Phishing url `{m}` detected in {message.guild.id}/{message.channel.id}/{message.id}"
)
await Warning(
active=True,
admin=self.user.id,
duration=24,
guild=message.guild.id,
reason="Phishing URL",
user=message.author.id,
).commit()
embed = warning_embed(message.author, "Phishing URL")
await message.channel.send(embed=embed)
await message.delete()
return True
return False
async def malicious_url(self, message: Message) -> None:
"""Check if the message contains any known phishing domains."""
for match in url.finditer(message.content):
async with ClientSession() as session:
resp = await session.get(
"https://spoopy.oceanlord.me/api/check_website", json={"website": match.string}
)
if resp.status != 200:
break
data = await resp.json()
for item in data["processed"]["urls"].values():
if not item["safe"]:
self.logger.debug(
f"Scam url `{match.string}` detected in {message.guild.id}/{message.channel.id}/{message.id}"
)
await Warning(
active=True,
admin=self.user.id,
duration=24,
guild=message.guild.id,
reason="Unsafe URL",
user=message.author.id,
).commit()
reasons = ", ".join(item["not_safe_reasons"])
embed = warning_embed(message.author, reasons)
await message.channel.send(embed=embed)
await message.delete()
return True
return False
@listen()
async def on_message(self, event: MessageCreate) -> None:
"""Handle on_message event. Calls other event handlers."""
message = event.message
if not isinstance(message.channel, DMChannel) and not message.author.bot:
await self.autoreact(message)
await self.massmention(message)
await self.roleping(message)
await self.autopurge(message)
await self.checks(message)
if not await self.phishing(message):
await self.malicious_url(message)
@listen()
async def on_message_edit(self, event: MessageUpdate) -> None:
"""Process on_message_edit events."""
before = event.before
after = event.after
if not after.author.bot:
modlog = await Setting.find_one(q(guild=after.guild.id, setting="modlog"))
if modlog:
if not before or before.content == after.content or before.content is None:
return
try:
channel = before.guild.get_channel(modlog.value)
fields = [
EmbedField(
"Original Message",
before.content if before.content else "N/A",
False,
),
EmbedField(
"New Message",
after.content if after.content else "N/A",
False,
),
]
embed = build_embed(
title="Message Edited",
description=f"{after.author.mention} edited a message in {before.channel.mention}",
fields=fields,
color="#fc9e3f",
timestamp=after.edited_timestamp,
url=after.jump_url,
)
embed.set_author(
name=after.author.username,
icon_url=after.author.display_avatar.url,
url=after.jump_url,
)
embed.set_footer(
text=f"{after.author.username}#{after.author.discriminator} | {after.author.id}"
)
await channel.send(embed=embed)
except Exception as e:
self.logger.warn(
f"Failed to process edit {before.guild.id}/{before.channel.id}/{before.id}: {e}"
)
if not isinstance(after.channel, DMChannel) and not after.author.bot:
await self.massmention(after)
await self.roleping(after)
await self.checks(after)
await self.roleping(after)
await self.checks(after)
if not await self.phishing(after):
await self.malicious_url(after)
@listen()
async def on_message_delete(self, event: MessageDelete) -> None:
"""Process on_message_delete events."""
message = event.message
modlog = await Setting.find_one(q(guild=message.guild.id, setting="modlog"))
if modlog:
try:
content = message.content or "N/A"
except AttributeError:
content = "N/A"
fields = [EmbedField("Original Message", content, False)]
try:
if message.attachments:
value = "\n".join([f"[{x.filename}]({x.url})" for x in message.attachments])
fields.append(
EmbedField(
name="Attachments",
value=value,
inline=False,
)
)
if message.sticker_items:
value = "\n".join([f"Sticker: {x.name}" for x in message.sticker_items])
fields.append(
EmbedField(
name="Stickers",
value=value,
inline=False,
)
)
if message.embeds:
value = str(len(message.embeds)) + " embeds"
fields.append(
EmbedField(
name="Embeds",
value=value,
inline=False,
)
)
channel = message.guild.get_channel(modlog.value)
embed = build_embed(
title="Message Deleted",
description=f"{message.author.mention}'s message was deleted from {message.channel.mention}",
fields=fields,
color="#fc9e3f",
)
embed.set_author(
name=message.author.username,
icon_url=message.author.display_avatar.url,
url=message.jump_url,
)
embed.set_footer(
text=(
f"{message.author.username}#{message.author.discriminator} | "
f"{message.author.id}"
)
)
await channel.send(embed=embed)
except Exception as e:
self.logger.warn(
f"Failed to process edit {message.guild.id}/{message.channel.id}/{message.id}: {e}"
)