jarvis-bot/jarvis/cogs/core/util.py

604 lines
21 KiB
Python

"""JARVIS Utility Cog."""
import logging
import re
import secrets
import string
import urllib.parse
from datetime import datetime, timezone
from io import BytesIO
import numpy as np
from dateparser import parse
from interactions import Client, Extension, SlashContext
from interactions import __version__ as ipyv
from interactions.models.discord.channel import GuildCategory, GuildText, GuildVoice
from interactions.models.discord.components import Button
from interactions.models.discord.embed import EmbedField, Embed
from interactions.models.discord.enums import ButtonStyle
from interactions.models.discord.file import File
from interactions.models.discord.guild import Guild
from interactions.models.discord.role import Role
from interactions.models.discord.user import User
from interactions.models.internal.application_commands import (
CommandType,
OptionType,
SlashCommand,
SlashCommandChoice,
context_menu,
slash_command,
slash_option,
)
from interactions.models.internal.command import cooldown
from interactions.models.internal.cooldowns import Buckets
from PIL import Image
from tzlocal import get_localzone
from jarvis import const as jconst
from jarvis.data import pigpen
from jarvis.data.robotcamo import emotes, hk, names
from jarvis.utils import build_embed
JARVIS_LOGO = Image.open("jarvis_small.png").convert("RGBA")
class UtilCog(Extension):
"""
Utility functions for JARVIS
Mostly system utility functions, but may change over time
"""
def __init__(self, bot: Client):
self.bot = bot
self.logger = logging.getLogger(__name__)
bot = SlashCommand(name="bot", description="Bot commands")
@bot.subcommand(
sub_cmd_name="donate", sub_cmd_description="Support the development of JARVIS"
)
async def _donate(self, ctx: SlashContext) -> None:
await ctx.send(
"""Want to support JARVIS? Donate here:
https://ko-fi.com/zevaryx
Tips will be used to pay server costs, and any excess will go to local animal shelters.
"""
)
@bot.subcommand(
sub_cmd_name="donate", sub_cmd_description="Support the development of JARVIS"
)
async def _donate(self, ctx: SlashContext) -> None:
await ctx.send(
"""Want to support JARVIS? Donate here:
https://ko-fi.com/zevaryx
Tips will be used to pay server costs, and any excess will go to local animal shelters.
"""
)
@bot.subcommand(sub_cmd_name="status", sub_cmd_description="Retrieve JARVIS status")
@cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30)
async def _status(self, ctx: SlashContext) -> None:
self.bot.logger.debug("Entered bot status")
title = "JARVIS Status"
desc = (
f"All systems online"
f"\nConnected to **{len(self.bot.guilds)}** guilds"
f"\nListening for **{len(self.bot.application_commands)}** commands"
)
self.bot.logger.debug("Description made")
color = "#3498db"
fields = []
uptime = int(self.bot.start_time.timestamp())
fields.append(
EmbedField(
name="Version",
value=f"[{jconst.__version__}](https://git.zevaryx.com/stark-industries/jarvis/jarvis-bot)",
inline=True,
)
)
fields.append(
EmbedField(
name="interactions",
value=f"[{ipyv}](https://interactionspy.readthedocs.io)",
inline=True,
)
)
self.bot.logger.debug("Getting repo information")
repo_url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-bot/"
fields.append(
EmbedField(name="Online Since", value=f"<t:{uptime}:F>", inline=False)
)
num_domains = len(self.bot.phishing_domains)
fields.append(
EmbedField(
name="Phishing Protection",
value=f"Detecting {num_domains} phishing domains",
)
)
embed = build_embed(title=title, description=desc, fields=fields, color=color)
components = Button(
style=ButtonStyle.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
)
await ctx.send(embeds=embed, components=components)
@bot.subcommand(
sub_cmd_name="logo",
sub_cmd_description="Get the current logo",
)
@cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30)
async def _logo(self, ctx: SlashContext) -> None:
with BytesIO() as image_bytes:
JARVIS_LOGO.save(image_bytes, "PNG")
image_bytes.seek(0)
logo = File(image_bytes, file_name="logo.png")
components = Button(
style=ButtonStyle.DANGER,
emoji="🗑️",
custom_id=f"delete|{ctx.author.id}",
)
await ctx.send(file=logo, components=components)
rc = SlashCommand(name="rc", description="Robot Camo emoji commands")
@rc.subcommand(sub_cmd_name="hk", sub_cmd_description="Robot Camo HK416")
async def _rchk(self, ctx: SlashContext) -> None:
await ctx.send(content=hk, ephemeral=True)
@rc.subcommand(
sub_cmd_name="auto",
sub_cmd_description="Automates robot camo letters",
)
@slash_option(
name="text",
description="Text to camo-ify",
opt_type=OptionType.STRING,
required=True,
)
async def _rcauto(self, ctx: SlashContext, text: str) -> None:
to_send = ""
if len(text) == 1 and not re.match(r"^[A-Z0-9-()$@!?^'#. ]$", text.upper()):
await ctx.send("Please use ASCII characters.", ephemeral=True)
return
for letter in text.upper():
if letter == " ":
to_send += " "
elif re.match(r"^[A-Z0-9-()$@!?^'#.]$", letter):
id = emotes[letter]
to_send += f":{names[id]}:"
if len(to_send) > 2000:
await ctx.send("Too long.", ephemeral=True)
elif len(to_send) == 0:
await ctx.send("No valid text found", ephemeral=True)
else:
await ctx.send(to_send, ephemeral=True)
async def _avatar(self, ctx: SlashContext, user: User = None) -> None:
if not user:
user = ctx.author
avatar = user.display_avatar.url
embed = build_embed(title="Avatar", description="", fields=[], color="#00FFEE")
embed.set_image(url=avatar)
embed.set_author(name=f"{user.username}", icon_url=avatar)
components = Button(
style=ButtonStyle.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
)
await ctx.send(embeds=embed, components=components)
@slash_command(name="emotes", description="Get all emotes")
async def _emotes(self, ctx: SlashContext) -> None:
try:
emojis = sorted(
await ctx.guild.fetch_all_custom_emojis(),
key=lambda x: x.animated,
reverse=True,
)
static = 0
animated = 0
messages = []
current = ""
for emoji in emojis:
static += not emoji.animated
animated += emoji.animated
if len(current) >= 2000:
messages.append(current.strip())
current = ""
current += f"<{'a' if emoji.animated else ''}:{emoji.name}:{emoji.id}> "
messages.append(current.strip())
embeds = [
Embed(
title=f"{static} Static, {animated} Animated, {len(emojis)} Total",
description=messages[0],
color="#3498db",
)
]
for message in messages[1:]:
embeds.append(Embed(description=message, color="#3498db"))
message = await ctx.send(embeds=embeds[:2])
for i in range(2, len(embeds), 2):
await message.reply(embeds=embeds[i : i + 2])
except Exception as e:
self.logger.error(f"Encountered error: {e}", exc_info=True)
@slash_command(
name="roleinfo",
description="Get role info",
)
@slash_option(
name="role",
description="Role to get info of",
opt_type=OptionType.ROLE,
required=True,
)
async def _roleinfo(self, ctx: SlashContext, role: Role) -> None:
fields = [
EmbedField(name="ID", value=str(role.id), inline=True),
EmbedField(name="Name", value=role.mention, inline=True),
EmbedField(name="Color", value=str(role.color.hex), inline=True),
EmbedField(name="Mention", value=f"`{role.mention}`", inline=True),
EmbedField(
name="Hoisted", value="Yes" if role.hoist else "No", inline=True
),
EmbedField(name="Position", value=str(role.position), inline=True),
EmbedField(
name="Mentionable",
value="Yes" if role.mentionable else "No",
inline=True,
),
EmbedField(name="Member Count", value=str(len(role.members)), inline=True),
EmbedField(
name="Created At", value=f"<t:{int(role.created_at.timestamp())}:F>"
),
]
embed = build_embed(
title="",
description="",
fields=fields,
color=role.color,
timestamp=role.created_at,
)
embed.set_footer(text="Role Created")
embed.set_thumbnail(url="attachment://color_show.png")
data = np.array(JARVIS_LOGO)
*_, a = data.T
fill = a > 0
data[..., :-1][fill.T] = list(role.color.rgb)
im = Image.fromarray(data)
with BytesIO() as image_bytes:
im.save(image_bytes, "PNG")
image_bytes.seek(0)
color_show = File(image_bytes, file_name="color_show.png")
components = Button(
style=ButtonStyle.DANGER,
emoji="🗑️",
custom_id=f"delete|{ctx.author.id}",
)
await ctx.send(embeds=embed, file=color_show, components=components)
@slash_command(name="avatar", description="Get a user avatar")
@slash_option(
name="user",
description="User to view avatar of",
opt_type=OptionType.USER,
required=False,
)
@cooldown(bucket=Buckets.USER, rate=1, interval=5)
async def _avatar_slash(self, ctx: SlashContext, user: User = None) -> None:
await self._avatar(ctx, user)
@context_menu(name="Avatar", context_type=CommandType.USER)
async def _avatar_menu(self, ctx: SlashContext) -> None:
await self._avatar(ctx, ctx.target)
async def _userinfo(self, ctx: SlashContext, user: User = None) -> None:
await ctx.defer()
if not user:
user = ctx.author
if not await ctx.guild.fetch_member(user.id):
await ctx.send("That user isn't in this guild.", ephemeral=True)
return
muted = False
if user.communication_disabled_until:
muted = user.communication_disabled_until > datetime.now(tz=timezone.utc)
user_roles = user.roles
if user_roles:
user_roles = sorted(user.roles, key=lambda x: -x.position)
fields = [
EmbedField(
name="Joined",
value=f"<t:{int(user.joined_at.timestamp())}:F>",
),
EmbedField(
name="Registered",
value=f"<t:{int(user.created_at.timestamp())}:F>",
),
EmbedField(
name=f"Roles [{len(user_roles)}]",
value=(
" ".join([x.mention for x in user_roles]) if user_roles else "None"
),
inline=False,
),
]
if muted:
ts = int(user.communication_disabled_until.timestamp())
fields.append(
EmbedField(name="Muted Until", value=f"<t:{ts}:F> (<t:{ts}:R>)")
)
embed = build_embed(
title="",
description=user.mention,
fields=fields,
color=str(user_roles[0].color) if user_roles else "#3498db",
)
embed.set_author(
name=f"{'🔇 ' if muted else ''}{user.display_name}",
icon_url=user.display_avatar.url,
)
embed.set_thumbnail(url=user.display_avatar.url)
embed.set_footer(text=f"ID: {user.id}")
components = Button(
style=ButtonStyle.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
)
await ctx.send(embeds=embed, components=components)
@slash_command(name="lmgtfy", description="Let me Google that for you")
@slash_option(
name="search",
description="What to search",
opt_type=OptionType.STRING,
required=True,
)
async def _lmgtfy(self, ctx: SlashContext, search: str) -> None:
url = "https://letmegooglethat.com/?q=" + urllib.parse.quote_plus(search)
await ctx.send(url)
@slash_command(
name="userinfo",
description="Get user info",
)
@slash_option(
name="user",
description="User to get info of",
opt_type=OptionType.USER,
required=False,
)
async def _userinfo_slsh(self, ctx: SlashContext, user: User = None) -> None:
await self._userinfo(ctx, user)
@context_menu(name="User Info", context_type=CommandType.USER)
async def _userinfo_menu(self, ctx: SlashContext) -> None:
await self._userinfo(ctx, ctx.target)
@slash_command(name="serverinfo", description="Get server info")
async def _server_info(self, ctx: SlashContext) -> None:
guild: Guild = ctx.guild
owner = await guild.fetch_owner()
owner = (
f"{owner.username}#{owner.discriminator}" if owner else "||`[redacted]`||"
)
categories = len([x for x in guild.channels if isinstance(x, GuildCategory)])
text_channels = len([x for x in guild.channels if isinstance(x, GuildText)])
voice_channels = len([x for x in guild.channels if isinstance(x, GuildVoice)])
threads = len(guild.threads)
members = guild.member_count
roles = len(guild.roles)
role_list = sorted(guild.roles, key=lambda x: x.position, reverse=True)
comma_role_list = ", ".join(role.mention for role in role_list)
fields = [
EmbedField(name="Owner", value=owner, inline=True),
EmbedField(name="Channel Categories", value=str(categories), inline=True),
EmbedField(name="Text Channels", value=str(text_channels), inline=True),
EmbedField(name="Voice Channels", value=str(voice_channels), inline=True),
EmbedField(name="Threads", value=str(threads), inline=True),
EmbedField(name="Members", value=str(members), inline=True),
EmbedField(name="Roles", value=str(roles), inline=True),
EmbedField(
name="Created At", value=f"<t:{int(guild.created_at.timestamp())}:F>"
),
]
role_embeds = []
if len(comma_role_list) < 1024:
fields.append(
EmbedField(
name=f"Role List [{roles}]", value=comma_role_list, inline=False
)
)
else:
current_role_list = role_list[0].mention
for role in role_list[1:]:
if len(current_role_list + ", " + role.mention) > 3192:
role_embed = build_embed(
title="", description=current_role_list, fields=[]
)
role_embeds.append(role_embed)
current_role_list = role.mention
else:
current_role_list += ", " + role.mention
if len(current_role_list) > 0:
role_embed = build_embed(
title="", description=current_role_list, fields=[]
)
role_embeds.append(role_embed)
embed = build_embed(
title="", description="", fields=fields, timestamp=guild.created_at
)
embed.set_author(name=guild.name, icon_url=guild.icon.url)
embed.set_thumbnail(url=guild.icon.url)
embed.set_footer(text=f"ID: {guild.id} | Server Created")
components = Button(
style=ButtonStyle.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
)
await ctx.send(embeds=embed, components=components)
@slash_command(
name="pw",
sub_cmd_name="gen",
description="Generate a secure password",
)
@slash_option(
name="length",
description="Password length (default 32)",
opt_type=OptionType.INTEGER,
required=False,
)
@slash_option(
name="chars",
description="Characters to include (default last option)",
opt_type=OptionType.INTEGER,
required=False,
choices=[
SlashCommandChoice(name="A-Za-z", value=0),
SlashCommandChoice(name="A-Fa-f0-9", value=1),
SlashCommandChoice(name="A-Za-z0-9", value=2),
SlashCommandChoice(name="A-Za-z0-9!@#$%^&*", value=3),
],
)
@cooldown(bucket=Buckets.USER, rate=1, interval=15)
async def _pw_gen(
self, ctx: SlashContext, length: int = 32, chars: int = 3
) -> None:
if length > 256:
await ctx.send("Please limit password to 256 characters", ephemeral=True)
return
choices = [
string.ascii_letters,
string.hexdigits,
string.ascii_letters + string.digits,
string.ascii_letters + string.digits + "!@#$%^&*",
]
pw = "".join(secrets.choice(choices[chars]) for i in range(length))
await ctx.send(
f"Generated password:\n`{pw}`\n\n"
'**WARNING: Once you press "Dismiss Message", '
"*the password is lost forever***",
ephemeral=True,
)
@slash_command(name="pigpen", description="Encode a string into pigpen")
@slash_option(
name="text",
description="Text to encode",
opt_type=OptionType.STRING,
required=True,
)
async def _pigpen(self, ctx: SlashContext, text: str) -> None:
outp = "`"
for c in text:
c = c.lower()
if c.lower() in pigpen.lookup:
c = pigpen.lookup[c.lower()]
elif c == " ":
c = " "
elif c == "`":
continue
outp += c + " "
outp += "`"
await ctx.send(outp[:2000])
@slash_command(
name="timestamp",
description="Convert a datetime or timestamp into it's counterpart",
)
@slash_option(
name="string",
description="String to convert",
opt_type=OptionType.STRING,
required=True,
)
@slash_option(
name="private",
description="Respond quietly?",
opt_type=OptionType.BOOLEAN,
required=False,
)
async def _timestamp(
self, ctx: SlashContext, string: str, private: bool = False
) -> None:
timestamp = parse(string)
if not timestamp:
await ctx.send("Valid time not found, try again", ephemeral=True)
return
if not timestamp.tzinfo:
timestamp = timestamp.replace(tzinfo=get_localzone()).astimezone(
tz=timezone.utc
)
timestamp_utc = timestamp.astimezone(tz=timezone.utc)
ts = int(timestamp.timestamp())
ts_utc = int(timestamp_utc.timestamp())
fields = [
EmbedField(name="Unix Epoch", value=f"`{ts}`"),
EmbedField(name="Unix Epoch (UTC)", value=f"`{ts_utc}`"),
EmbedField(name="Absolute Time", value=f"<t:{ts_utc}:F>\n`<t:{ts_utc}:F>`"),
EmbedField(name="Relative Time", value=f"<t:{ts_utc}:R>\n`<t:{ts_utc}:R>`"),
EmbedField(name="ISO8601", value=timestamp.isoformat()),
]
embed = build_embed(
title="Converted Time", description=f"`{string}`", fields=fields
)
components = Button(
style=ButtonStyle.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
)
await ctx.send(embeds=embed, ephemeral=private, components=components)
@bot.subcommand(sub_cmd_name="support", sub_cmd_description="Got issues?")
async def _support(self, ctx: SlashContext) -> None:
await ctx.send(
f"""
Run into issues with {self.bot.user.mention}? Please report them here!
https://s.zevs.me/jarvis-support
We'll help as best we can with whatever issues you encounter.
"""
)
@bot.subcommand(
sub_cmd_name="privacy_terms",
sub_cmd_description="View Privacy and Terms of Use",
)
async def _privacy_terms(self, ctx: SlashContext) -> None:
await ctx.send(
"""
View the privacy statement here: https://s.zevs.me/jarvis-privacy
View the terms of use here: https://s.zevs.me/jarvis-terms
"""
)
def setup(bot: Client) -> None:
"""Add UtilCog to JARVIS"""
UtilCog(bot)