115 lines
5.1 KiB
Python
115 lines
5.1 KiB
Python
"""JARVIS event mixin."""
|
|
import asyncio
|
|
|
|
from aiohttp import ClientSession
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Reminder, Setting
|
|
from jarvis_core.util.ansi import RESET, Fore, Format, fmt
|
|
from naff import listen
|
|
from naff.models.discord.channel import DMChannel
|
|
from naff.models.discord.embed import EmbedField
|
|
from naff.models.naff.context import Context, InteractionContext, PrefixedContext
|
|
|
|
from jarvis import const
|
|
from jarvis.client.events.components import ComponentEventMixin
|
|
from jarvis.client.events.member import MemberEventMixin
|
|
from jarvis.client.events.message import MessageEventMixin
|
|
from jarvis.tracking import jarvis_info
|
|
from jarvis.utils import build_embed
|
|
|
|
KEY_FMT = fmt(Fore.GRAY)
|
|
VAL_FMT = fmt(Fore.WHITE)
|
|
CMD_FMT = fmt(Fore.GREEN, Format.BOLD)
|
|
|
|
|
|
class EventMixin(MemberEventMixin, MessageEventMixin, ComponentEventMixin):
|
|
async def _chunk_all(self) -> None:
|
|
"""Chunk all guilds."""
|
|
for guild in self.guilds:
|
|
self.logger.debug(f"Chunking guild {guild.name} <{guild.id}>")
|
|
await guild.chunk_guild()
|
|
|
|
async def _sync_domains(self) -> None:
|
|
self.logger.debug("Loading phishing domains")
|
|
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
|
|
response = await session.get("https://phish.sinking.yachts/v2/all")
|
|
response.raise_for_status()
|
|
self.phishing_domains = await response.json()
|
|
self.logger.info(f"Protected from {len(self.phishing_domains)} phishing domains")
|
|
|
|
@listen()
|
|
async def on_startup(self) -> None:
|
|
"""NAFF on_startup override. Prometheus info generated here."""
|
|
jarvis_info.info({"version": const.__version__})
|
|
|
|
@listen()
|
|
async def on_ready(self) -> None:
|
|
"""NAFF on_ready override."""
|
|
try:
|
|
if not self.synced:
|
|
await self._sync_domains()
|
|
self._update_domains.start()
|
|
asyncio.create_task(self._chunk_all())
|
|
self.synced = True
|
|
except Exception as e:
|
|
self.logger.error("Failed to load anti-phishing", exc_info=e)
|
|
self.logger.info("Logged in as {}".format(self.user)) # noqa: T001
|
|
self.logger.info("Connected to {} guild(s)".format(len(self.guilds))) # noqa: T001
|
|
self.logger.info("Current version: {}".format(const.__version__))
|
|
self.logger.info( # noqa: T001
|
|
"https://discord.com/api/oauth2/authorize?client_id="
|
|
"{}&permissions=8&scope=bot%20applications.commands".format(self.user.id)
|
|
)
|
|
|
|
self.logger.debug("Hitting Reminders for faster loads")
|
|
_ = await Reminder.find().to_list(None)
|
|
|
|
# Modlog
|
|
async def on_command(self, ctx: Context) -> None:
|
|
"""NAFF on_command override."""
|
|
name = ctx.invoke_target
|
|
if not isinstance(ctx.channel, DMChannel) and name not in ["pw"]:
|
|
modlog = await Setting.find_one(q(guild=ctx.guild.id, setting="activitylog"))
|
|
ignore = await Setting.find_one(q(guild=ctx.guild.id, setting="log_ignore"))
|
|
if modlog and (ignore and ctx.channel.id not in ignore.value):
|
|
channel = await ctx.guild.fetch_channel(modlog.value)
|
|
args = []
|
|
if isinstance(ctx, InteractionContext) and ctx.target_id:
|
|
args.append(f"{KEY_FMT}context target:{VAL_FMT}{ctx.target}{RESET}")
|
|
if isinstance(ctx, InteractionContext):
|
|
for k, v in ctx.kwargs.items():
|
|
if isinstance(v, str):
|
|
v = v.replace("`", "\\`")
|
|
if len(v) > 100:
|
|
v = v[:97] + "..."
|
|
args.append(f"{KEY_FMT}{k}:{VAL_FMT}{v}{RESET}")
|
|
elif isinstance(ctx, PrefixedContext):
|
|
for v in ctx.args:
|
|
if isinstance(v, str) and len(v) > 100:
|
|
v = v[97] + "..."
|
|
args.append(f"{VAL_FMT}{v}{RESET}")
|
|
args = " ".join(args)
|
|
fields = [
|
|
EmbedField(
|
|
name="Command",
|
|
value=f"```ansi\n{CMD_FMT}{ctx.invoke_target}{RESET} {args}\n```",
|
|
inline=False,
|
|
),
|
|
]
|
|
embed = build_embed(
|
|
title="Command Invoked",
|
|
description=f"{ctx.author.mention} invoked a command in {ctx.channel.mention}",
|
|
fields=fields,
|
|
color="#fc9e3f",
|
|
)
|
|
embed.set_author(name=ctx.author.username, icon_url=ctx.author.display_avatar.url)
|
|
embed.set_footer(
|
|
text=f"{ctx.author.user.username}#{ctx.author.discriminator} | {ctx.author.id}"
|
|
)
|
|
if channel:
|
|
await channel.send(embeds=embed)
|
|
else:
|
|
self.logger.warning(
|
|
f"Activitylog channel no longer exists in {ctx.guild.name}, removing"
|
|
)
|
|
await modlog.delete()
|