"""JARVIS Utility Cog.""" import logging import re import secrets import string from datetime import timezone from io import BytesIO import numpy as np from dateparser import parse from naff import Client, Cog, InteractionContext, const from naff.models.discord.channel import GuildCategory, GuildText, GuildVoice from naff.models.discord.embed import EmbedField from naff.models.discord.file import File from naff.models.discord.guild import Guild from naff.models.discord.role import Role from naff.models.discord.user import Member, User from naff.models.naff.application_commands import ( CommandTypes, OptionTypes, SlashCommandChoice, context_menu, slash_command, slash_option, ) from naff.models.naff.command import cooldown from naff.models.naff.cooldowns import Buckets from PIL import Image from tzlocal import get_localzone from jarvis import const as jconst from jarvis.data import pigpen from jarvis.data.robotcamo import emotes, hk, names from jarvis.utils import build_embed, get_repo_hash JARVIS_LOGO = Image.open("jarvis_small.png").convert("RGBA") class UtilCog(Cog): """ Utility functions for JARVIS Mostly system utility functions, but may change over time """ def __init__(self, bot: Client): self.bot = bot self.logger = logging.getLogger(__name__) @slash_command(name="status", description="Retrieve JARVIS status") @cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30) async def _status(self, ctx: InteractionContext) -> None: title = "JARVIS Status" desc = f"All systems online\nConnected to **{len(self.bot.guilds)}** guilds" color = "#3498db" fields = [] uptime = int(self.bot.start_time.timestamp()) fields.append(EmbedField(name="Version", value=jconst.__version__, inline=True)) fields.append(EmbedField(name="naff", value=const.__version__, inline=True)) fields.append(EmbedField(name="Git Hash", value=get_repo_hash()[:7], inline=True)) fields.append(EmbedField(name="Online Since", value=f"", inline=False)) num_domains = len(self.bot.phishing_domains) fields.append( EmbedField( name="Phishing Protection", value=f"Detecting {num_domains} phishing domains" ) ) embed = build_embed(title=title, description=desc, fields=fields, color=color) await ctx.send(embed=embed) @slash_command( name="logo", description="Get the current logo", ) @cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30) async def _logo(self, ctx: InteractionContext) -> None: with BytesIO() as image_bytes: JARVIS_LOGO.save(image_bytes, "PNG") image_bytes.seek(0) logo = File(image_bytes, file_name="logo.png") await ctx.send(file=logo) @slash_command(name="rchk", description="Robot Camo HK416") async def _rchk(self, ctx: InteractionContext) -> None: await ctx.send(content=hk) @slash_command( name="rcauto", description="Automates robot camo letters", ) @slash_option( name="text", description="Text to camo-ify", opt_type=OptionTypes.STRING, required=True, ) async def _rcauto(self, ctx: InteractionContext, text: str) -> None: to_send = "" if len(text) == 1 and not re.match(r"^[A-Z0-9-()$@!?^'#. ]$", text.upper()): await ctx.send("Please use ASCII characters.", ephemeral=True) return for letter in text.upper(): if letter == " ": to_send += " " elif re.match(r"^[A-Z0-9-()$@!?^'#.]$", letter): id = emotes[letter] to_send += f":{names[id]}:" if len(to_send) > 2000: await ctx.send("Too long.", ephemeral=True) elif len(to_send) == 0: await ctx.send("No valid text found", ephemeral=True) else: await ctx.send(to_send) @slash_command(name="avatar", description="Get a user avatar") @slash_option( name="user", description="User to view avatar of", opt_type=OptionTypes.USER, required=False, ) @cooldown(bucket=Buckets.USER, rate=1, interval=5) async def _avatar(self, ctx: InteractionContext, user: User = None) -> None: if not user: user = ctx.author avatar = user.avatar.url if isinstance(user, Member): avatar = user.display_avatar.url embed = build_embed(title="Avatar", description="", fields=[], color="#00FFEE") embed.set_image(url=avatar) embed.set_author(name=f"{user.username}#{user.discriminator}", icon_url=avatar) await ctx.send(embed=embed) @slash_command( name="roleinfo", description="Get role info", ) @slash_option( name="role", description="Role to get info of", opt_type=OptionTypes.ROLE, required=True, ) async def _roleinfo(self, ctx: InteractionContext, role: Role) -> None: fields = [ EmbedField(name="ID", value=str(role.id), inline=True), EmbedField(name="Name", value=role.mention, inline=True), EmbedField(name="Color", value=str(role.color.hex), inline=True), EmbedField(name="Mention", value=f"`{role.mention}`", inline=True), EmbedField(name="Hoisted", value="Yes" if role.hoist else "No", inline=True), EmbedField(name="Position", value=str(role.position), inline=True), EmbedField(name="Mentionable", value="Yes" if role.mentionable else "No", inline=True), EmbedField(name="Member Count", value=str(len(role.members)), inline=True), EmbedField(name="Created At", value=f""), ] embed = build_embed( title="", description="", fields=fields, color=role.color, timestamp=role.created_at, ) embed.set_footer(text="Role Created") embed.set_thumbnail(url="attachment://color_show.png") data = np.array(JARVIS_LOGO) r, g, b, a = data.T fill = a > 0 data[..., :-1][fill.T] = list(role.color.rgb) im = Image.fromarray(data) with BytesIO() as image_bytes: im.save(image_bytes, "PNG") image_bytes.seek(0) color_show = File(image_bytes, file_name="color_show.png") await ctx.send(embed=embed, file=color_show) async def _userinfo(self, ctx: InteractionContext, user: User = None) -> None: await ctx.defer() if not user: user = ctx.author if not await ctx.guild.fetch_member(user.id): await ctx.send("That user isn't in this guild.", ephemeral=True) return user_roles = user.roles if user_roles: user_roles = sorted(user.roles, key=lambda x: -x.position) fields = [ EmbedField( name="Joined", value=f"", ), EmbedField( name="Registered", value=f"", ), EmbedField( name=f"Roles [{len(user_roles)}]", value=" ".join([x.mention for x in user_roles]) if user_roles else "None", inline=False, ), ] embed = build_embed( title="", description=user.mention, fields=fields, color=str(user_roles[0].color) if user_roles else "#3498db", ) embed.set_author( name=f"{user.display_name}#{user.discriminator}", icon_url=user.display_avatar.url ) embed.set_thumbnail(url=user.display_avatar.url) embed.set_footer(text=f"ID: {user.id}") await ctx.send(embed=embed) @slash_command( name="userinfo", description="Get user info", ) @slash_option( name="user", description="User to get info of", opt_type=OptionTypes.USER, required=False, ) async def _userinfo_slsh(self, ctx: InteractionContext, user: User = None) -> None: await self._userinfo(ctx, user) @context_menu(name="User Info", context_type=CommandTypes.USER) async def _userinfo_menu(self, ctx: InteractionContext) -> None: await self._userinfo(ctx, ctx.target) @slash_command(name="serverinfo", description="Get server info") async def _server_info(self, ctx: InteractionContext) -> None: guild: Guild = ctx.guild owner = await guild.fetch_owner() owner = f"{owner.username}#{owner.discriminator}" if owner else "||`[redacted]`||" categories = len([x for x in guild.channels if isinstance(x, GuildCategory)]) text_channels = len([x for x in guild.channels if isinstance(x, GuildText)]) voice_channels = len([x for x in guild.channels if isinstance(x, GuildVoice)]) threads = len(guild.threads) members = guild.member_count roles = len(guild.roles) role_list = sorted(guild.roles, key=lambda x: x.position, reverse=True) role_list = ", ".join(role.mention for role in role_list) fields = [ EmbedField(name="Owner", value=owner, inline=True), EmbedField(name="Channel Categories", value=str(categories), inline=True), EmbedField(name="Text Channels", value=str(text_channels), inline=True), EmbedField(name="Voice Channels", value=str(voice_channels), inline=True), EmbedField(name="Threads", value=str(threads), inline=True), EmbedField(name="Members", value=str(members), inline=True), EmbedField(name="Roles", value=str(roles), inline=True), EmbedField(name="Created At", value=f""), ] if len(role_list) < 1024: fields.append(EmbedField(name="Role List", value=role_list, inline=False)) embed = build_embed(title="", description="", fields=fields, timestamp=guild.created_at) embed.set_author(name=guild.name, icon_url=guild.icon.url) embed.set_thumbnail(url=guild.icon.url) embed.set_footer(text=f"ID: {guild.id} | Server Created") await ctx.send(embed=embed) @slash_command( name="pw", sub_cmd_name="gen", description="Generate a secure password", scopes=[862402786116763668], ) @slash_option( name="length", description="Password length (default 32)", opt_type=OptionTypes.INTEGER, required=False, ) @slash_option( name="chars", description="Characters to include (default last option)", opt_type=OptionTypes.INTEGER, required=False, choices=[ SlashCommandChoice(name="A-Za-z", value=0), SlashCommandChoice(name="A-Fa-f0-9", value=1), SlashCommandChoice(name="A-Za-z0-9", value=2), SlashCommandChoice(name="A-Za-z0-9!@#$%^&*", value=3), ], ) @cooldown(bucket=Buckets.USER, rate=1, interval=15) async def _pw_gen(self, ctx: InteractionContext, length: int = 32, chars: int = 3) -> None: if length > 256: await ctx.send("Please limit password to 256 characters", ephemeral=True) return choices = [ string.ascii_letters, string.hexdigits, string.ascii_letters + string.digits, string.ascii_letters + string.digits + "!@#$%^&*", ] pw = "".join(secrets.choice(choices[chars]) for i in range(length)) await ctx.send( f"Generated password:\n`{pw}`\n\n" '**WARNING: Once you press "Dismiss Message", ' "*the password is lost forever***", ephemeral=True, ) @slash_command(name="pigpen", description="Encode a string into pigpen") @slash_option( name="text", description="Text to encode", opt_type=OptionTypes.STRING, required=True ) async def _pigpen(self, ctx: InteractionContext, text: str) -> None: outp = "`" for c in text: c = c.lower() if c.lower() in pigpen.lookup: c = pigpen.lookup[c.lower()] elif c == " ": c = " " elif c == "`": continue outp += c + " " outp += "`" await ctx.send(outp[:2000]) @slash_command( name="timestamp", description="Convert a datetime or timestamp into it's counterpart" ) @slash_option( name="string", description="String to convert", opt_type=OptionTypes.STRING, required=True ) @slash_option( name="private", description="Respond quietly?", opt_type=OptionTypes.BOOLEAN, required=False ) async def _timestamp(self, ctx: InteractionContext, string: str, private: bool = False) -> None: timestamp = parse(string) if not timestamp: await ctx.send("Valid time not found, try again", ephemeral=True) return if not timestamp.tzinfo: timestamp = timestamp.replace(tzinfo=get_localzone()).astimezone(tz=timezone.utc) timestamp_utc = timestamp.astimezone(tz=timezone.utc) ts = int(timestamp.timestamp()) ts_utc = int(timestamp_utc.timestamp()) fields = [ EmbedField(name="Unix Epoch", value=f"`{ts}`"), EmbedField(name="Unix Epoch (UTC)", value=f"`{ts_utc}`"), EmbedField(name="Absolute Time", value=f"\n``"), EmbedField(name="Relative Time", value=f"\n``"), EmbedField(name="ISO8601", value=timestamp.isoformat()), ] embed = build_embed(title="Converted Time", description=f"`{string}`", fields=fields) await ctx.send(embed=embed, ephemeral=private) def setup(bot: Client) -> None: """Add UtilCog to JARVIS""" UtilCog(bot)