jarvis-bot/jarvis/client/events/__init__.py

156 lines
7.3 KiB
Python

"""JARVIS event mixin."""
import asyncio
from aiohttp import ClientSession
from beanie.operators import Set
from interactions import listen
from interactions.ext.prefixed_commands.context import PrefixedContext
from interactions.models.discord.channel import DMChannel
from interactions.models.discord.embed import EmbedField
from interactions.models.internal.application_commands import ContextMenu
from interactions.models.internal.context import BaseContext, InteractionContext
from jarvis_core.db import q
from jarvis_core.db.models import Reminder, Setting
from jarvis_core.util.ansi import RESET, Fore, Format, fmt
from statipy.db import StaticStat
from jarvis import const
from jarvis.client.events.components import ComponentEventMixin
from jarvis.client.events.member import MemberEventMixin
from jarvis.client.events.message import MessageEventMixin
from jarvis.utils import build_embed
KEY_FMT = fmt(Fore.GRAY)
VAL_FMT = fmt(Fore.WHITE)
CMD_FMT = fmt(Fore.GREEN, Format.BOLD)
class EventMixin(MemberEventMixin, MessageEventMixin, ComponentEventMixin):
async def _chunk_all(self) -> None:
"""Chunk all guilds."""
self.logger.warn("Deprecated function, nothing will happen")
# for guild in self.guilds:
# if not guild.chunked.is_set():
# self.logger.debug(f"Chunking guild {guild.name} <{guild.id}>")
# await guild.chunk_guild()
async def _sync_domains(self) -> None:
self.logger.debug("Loading phishing domains")
async with ClientSession(headers={"X-Identity": "Discord: zevaryx#5779"}) as session:
response = await session.get("https://phish.sinking.yachts/v2/all")
response.raise_for_status()
self.phishing_domains = await response.json()
self.logger.info(f"Protected from {len(self.phishing_domains)} phishing domains")
@listen()
async def on_startup(self) -> None:
"""NAFF on_startup override. Prometheus info generated here."""
await StaticStat.find_one(StaticStat.name == "jarvis_version", StaticStat.client_id == self.user.id).upsert(
Set(
{
StaticStat.client_name: self.client_name,
StaticStat.value: const.__version__,
}
),
on_insert=StaticStat(
name="jarvis_version",
client_id=self.user.id,
client_name=self.client_name,
value=const.__version__,
),
)
try:
if not self.synced:
await self._sync_domains()
self._update_domains.start()
asyncio.create_task(self._chunk_all())
self.synced = True
except Exception as e:
self.logger.error("Failed to load anti-phishing", exc_info=e)
self.logger.info("Logged in as {}".format(self.user)) # noqa: T001
self.logger.info("Connected to {} guild(s)".format(len(self.guilds))) # noqa: T001
self.logger.info("Current version: {}".format(const.__version__))
self.logger.info( # noqa: T001
"https://discord.com/api/oauth2/authorize?client_id="
"{}&permissions=8&scope=bot%20applications.commands".format(self.user.id)
)
global_base_commands = 0
guild_base_commands = 0
global_context_menus = 0
guild_context_menus = 0
try:
for cid in self.interaction_tree:
if cid == 0:
global_base_commands = sum(
1
for _ in self.interaction_tree[cid]
if not isinstance(self.interaction_tree[cid][_], ContextMenu)
)
global_context_menus = sum(
1 for _ in self.interaction_tree[cid] if isinstance(self.interaction_tree[cid][_], ContextMenu)
)
else:
guild_base_commands += sum(
1
for _ in self.interaction_tree[cid]
if not isinstance(self.interaction_tree[cid][_], ContextMenu)
)
guild_context_menus += sum(
1 for _ in self.interaction_tree[cid] if isinstance(self.interaction_tree[cid][_], ContextMenu)
)
self.logger.info("Loaded {:>3} global base slash commands".format(global_base_commands))
self.logger.info("Loaded {:>3} global context menus".format(global_context_menus))
self.logger.info("Loaded {:>3} guild base slash commands".format(guild_base_commands))
self.logger.info("Loaded {:>3} guild context menus".format(guild_context_menus))
except Exception:
self.logger.error("interaction_tree not found, try updating NAFF")
self.logger.debug("Hitting Reminders for faster loads")
_ = await Reminder.find().to_list(None)
# Modlog
async def on_command(self, ctx: BaseContext) -> None:
"""NAFF on_command override."""
name = ctx.invoke_target
if not isinstance(ctx.channel, DMChannel) and name not in ["pw"]:
modlog = await Setting.find_one(q(guild=ctx.guild.id, setting="activitylog"))
ignore = await Setting.find_one(q(guild=ctx.guild.id, setting="log_ignore"))
if modlog and (ignore and ctx.channel.id not in ignore.value):
channel = await ctx.guild.fetch_channel(modlog.value)
args = []
if isinstance(ctx, InteractionContext) and ctx.target_id:
args.append(f"{KEY_FMT}context target:{VAL_FMT}{ctx.target}{RESET}")
if isinstance(ctx, InteractionContext):
for k, v in ctx.kwargs.items():
if isinstance(v, str):
v = v.replace("`", "\\`")
if len(v) > 100:
v = v[:97] + "..."
args.append(f"{KEY_FMT}{k}:{VAL_FMT}{v}{RESET}")
elif isinstance(ctx, PrefixedContext):
for v in ctx.args:
if isinstance(v, str) and len(v) > 100:
v = v[97] + "..."
args.append(f"{VAL_FMT}{v}{RESET}")
args = " ".join(args)
fields = [
EmbedField(
name="Command",
value=f"```ansi\n{CMD_FMT}{ctx.invoke_target}{RESET} {args}\n```",
inline=False,
),
]
embed = build_embed(
title="Command Invoked",
description=f"{ctx.author.mention} invoked a command in {ctx.channel.mention}",
fields=fields,
color="#fc9e3f",
)
embed.set_author(name=ctx.author.username, icon_url=ctx.author.display_avatar.url)
embed.set_footer(text=f"{ctx.author.user.username}#{ctx.author.discriminator} | {ctx.author.id}")
if channel:
await channel.send(embeds=embed)
else:
self.logger.warning(f"Activitylog channel no longer exists in {ctx.guild.name}, removing")
await modlog.delete()