"""JARVIS Developer Cog.""" import base64 import hashlib import logging import re import subprocess # noqa: S404 import uuid as uuidpy import ulid as ulidpy from bson import ObjectId from jarvis_core.filters import invites, url from jarvis_core.util import convert_bytesize, hash from jarvis_core.util.http import get_size from naff import Client, Cog, InteractionContext from naff.models.discord.embed import EmbedField from naff.models.discord.message import Attachment from naff.models.naff.application_commands import ( OptionTypes, SlashCommandChoice, slash_command, slash_option, ) from naff.models.naff.command import cooldown from naff.models.naff.cooldowns import Buckets from jarvis.utils import build_embed supported_hashes = {x for x in hashlib.algorithms_guaranteed if "shake" not in x} OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$") URL_VERIFY = re.compile( r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+" ) DN_VERIFY = re.compile( r"^(?:(?PCN=(?P[^,]*)),)?(?:(?P(?:(?:CN|OU)=[^,]+,?)+),)?(?P(?:DC=[^,]+,?)+)$" # noqa: E501 ) ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE) UUID_VERIFY = re.compile( r"[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$", re.IGNORECASE, ) UUID_GET = {3: uuidpy.uuid3, 5: uuidpy.uuid5} MAX_FILESIZE = 5 * (1024**3) # 5GB class DevCog(Cog): """JARVIS Developer Cog.""" def __init__(self, bot: Client): self.bot = bot self.logger = logging.getLogger(__name__) @slash_command(name="hash", description="Hash some data") @slash_option( name="method", description="Hash method", opt_type=OptionTypes.STRING, required=True, choices=[SlashCommandChoice(name=x, value=x) for x in supported_hashes], ) @slash_option( name="data", description="Data to hash", opt_type=OptionTypes.STRING, required=False, ) @slash_option( name="attach", description="File to hash", opt_type=OptionTypes.ATTACHMENT, required=False ) @cooldown(bucket=Buckets.USER, rate=1, interval=2) async def _hash( self, ctx: InteractionContext, method: str, data: str = None, attach: Attachment = None ) -> None: if not data and not attach: await ctx.send( "No data to hash", ephemeral=True, ) return if data and invites.match(data): await ctx.send("No hashing invites", ephemeral=True) return title = data if attach: data = attach.url title = attach.filename elif url.match(data): try: if (size := await get_size(data)) > MAX_FILESIZE: await ctx.send("Please hash files that are <= 5GB in size", ephemeral=True) self.logger.debug(f"Refused to hash file of size {convert_bytesize(size)}") return except Exception as e: await ctx.send(f"Failed to retrieve URL: ```\n{e}\n```", ephemeral=True) return title = data.split("/")[-1] await ctx.defer() try: hexstr, size, c_type = await hash(data, method) except Exception as e: await ctx.send(f"Failed to hash data: ```\n{e}\n```", ephemeral=True) return data_size = convert_bytesize(size) description = "Hashed using " + method fields = [ EmbedField("Content Type", c_type, False), EmbedField("Data Size", data_size, False), EmbedField("Hash", f"`{hexstr}`", False), ] embed = build_embed(title=title, description=description, fields=fields) await ctx.send(embed=embed) @slash_command(name="uuid", description="Generate a UUID") @slash_option( name="version", description="UUID version", opt_type=OptionTypes.STRING, required=True, choices=[SlashCommandChoice(name=x, value=x) for x in ["3", "4", "5"]], ) @slash_option( name="data", description="Data for UUID version 3,5", opt_type=OptionTypes.STRING, required=False, ) async def _uuid(self, ctx: InteractionContext, version: str, data: str = None) -> None: version = int(version) if version in [3, 5] and not data: await ctx.send(f"UUID{version} requires data.", ephemeral=True) return if version == 4: await ctx.send(f"UUID4: `{uuidpy.uuid4()}`") else: to_send = None if OID_VERIFY.match(data): to_send = UUID_GET[version](uuidpy.NAMESPACE_OID, data) elif URL_VERIFY.match(data): to_send = UUID_GET[version](uuidpy.NAMESPACE_URL, data) elif DN_VERIFY.match(data): to_send = UUID_GET[version](uuidpy.NAMESPACE_X500, data) else: to_send = UUID_GET[version](uuidpy.NAMESPACE_DNS, data) await ctx.send(f"UUID{version}: `{to_send}`") @slash_command( name="objectid", description="Generate an ObjectID", ) @cooldown(bucket=Buckets.USER, rate=1, interval=2) async def _objectid(self, ctx: InteractionContext) -> None: await ctx.send(f"ObjectId: `{str(ObjectId())}`") @slash_command( name="ulid", description="Generate a ULID", ) @cooldown(bucket=Buckets.USER, rate=1, interval=2) async def _ulid(self, ctx: InteractionContext) -> None: await ctx.send(f"ULID: `{ulidpy.new().str}`") @slash_command( name="uuid2ulid", description="Convert a UUID to a ULID", ) @slash_option( name="uuid", description="UUID to convert", opt_type=OptionTypes.STRING, required=True ) @cooldown(bucket=Buckets.USER, rate=1, interval=2) async def _uuid2ulid(self, ctx: InteractionContext, uuid: str) -> None: if UUID_VERIFY.match(uuid): u = ulidpy.parse(uuid) await ctx.send(f"ULID: `{u.str}`") else: await ctx.send("Invalid UUID") @slash_command( name="ulid2uuid", description="Convert a ULID to a UUID", ) @slash_option( name="ulid", description="ULID to convert", opt_type=OptionTypes.STRING, required=True ) @cooldown(bucket=Buckets.USER, rate=1, interval=2) async def _ulid2uuid(self, ctx: InteractionContext, ulid: str) -> None: if ULID_VERIFY.match(ulid): ulid = ulidpy.parse(ulid) await ctx.send(f"UUID: `{ulid.uuid}`") else: await ctx.send("Invalid ULID.") base64_methods = ["b64", "b16", "b32", "a85", "b85"] @slash_command(name="encode", description="Encode some data") @slash_option( name="method", description="Encode method", opt_type=OptionTypes.STRING, required=True, choices=[SlashCommandChoice(name=x, value=x) for x in base64_methods], ) @slash_option( name="data", description="Data to encode", opt_type=OptionTypes.STRING, required=True, ) async def _encode(self, ctx: InteractionContext, method: str, data: str) -> None: if invites.search(data): await ctx.send( "Please don't use this to bypass invite restrictions", ephemeral=True, ) return mstr = method method = getattr(base64, method + "encode") try: encoded = method(data.encode("UTF-8")).decode("UTF-8") except Exception as e: await ctx.send(f"Failed to encode data: {e}") return fields = [ EmbedField(name="Plaintext", value=f"`{data}`", inline=False), EmbedField(name=mstr, value=f"`{encoded}`", inline=False), ] embed = build_embed(title="Decoded Data", description="", fields=fields) await ctx.send(embed=embed) @slash_command(name="decode", description="Decode some data") @slash_option( name="method", description="Decode method", opt_type=OptionTypes.STRING, required=True, choices=[SlashCommandChoice(name=x, value=x) for x in base64_methods], ) @slash_option( name="data", description="Data to encode", opt_type=OptionTypes.STRING, required=True, ) async def _decode(self, ctx: InteractionContext, method: str, data: str) -> None: mstr = method method = getattr(base64, method + "decode") try: decoded = method(data.encode("UTF-8")).decode("UTF-8") except Exception as e: await ctx.send(f"Failed to decode data: {e}") return if invites.search(decoded): await ctx.send( "Please don't use this to bypass invite restrictions", ephemeral=True, ) return fields = [ EmbedField(name="Plaintext", value=f"`{data}`", inline=False), EmbedField(name=mstr, value=f"`{decoded}`", inline=False), ] embed = build_embed(title="Decoded Data", description="", fields=fields) await ctx.send(embed=embed) @slash_command(name="cloc", description="Get JARVIS lines of code") @cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30) async def _cloc(self, ctx: InteractionContext) -> None: output = subprocess.check_output( # noqa: S603, S607 ["tokei", "-C", "--sort", "code"] ).decode("UTF-8") await ctx.send(f"```haskell\n{output}\n```") def setup(bot: Client) -> None: """Add DevCog to JARVIS""" DevCog(bot)