280 lines
9.6 KiB
Python
280 lines
9.6 KiB
Python
"""JARVIS Developer Cog."""
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import re
|
|
import subprocess # noqa: S404
|
|
import uuid as uuidpy
|
|
|
|
import ulid as ulidpy
|
|
from bson import ObjectId
|
|
from dis_snek import InteractionContext, Scale, Snake
|
|
from dis_snek.models.discord.embed import EmbedField
|
|
from dis_snek.models.discord.message import Attachment
|
|
from dis_snek.models.snek.application_commands import (
|
|
OptionTypes,
|
|
SlashCommandChoice,
|
|
slash_command,
|
|
slash_option,
|
|
)
|
|
from dis_snek.models.snek.command import cooldown
|
|
from dis_snek.models.snek.cooldowns import Buckets
|
|
from jarvis_core.filters import invites, url
|
|
from jarvis_core.util import convert_bytesize, hash
|
|
from jarvis_core.util.http import get_size
|
|
|
|
from jarvis.utils import build_embed
|
|
|
|
supported_hashes = {x for x in hashlib.algorithms_guaranteed if "shake" not in x}
|
|
|
|
OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$")
|
|
URL_VERIFY = re.compile(
|
|
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
|
|
)
|
|
DN_VERIFY = re.compile(
|
|
r"^(?:(?P<cn>CN=(?P<name>[^,]*)),)?(?:(?P<path>(?:(?:CN|OU)=[^,]+,?)+),)?(?P<domain>(?:DC=[^,]+,?)+)$" # noqa: E501
|
|
)
|
|
ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE)
|
|
UUID_VERIFY = re.compile(
|
|
r"[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
UUID_GET = {3: uuidpy.uuid3, 5: uuidpy.uuid5}
|
|
|
|
MAX_FILESIZE = 5 * (1024**3) # 5GB
|
|
|
|
|
|
class DevCog(Scale):
|
|
"""JARVIS Developer Cog."""
|
|
|
|
def __init__(self, bot: Snake):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
@slash_command(name="hash", description="Hash some data")
|
|
@slash_option(
|
|
name="method",
|
|
description="Hash method",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
choices=[SlashCommandChoice(name=x, value=x) for x in supported_hashes],
|
|
)
|
|
@slash_option(
|
|
name="data",
|
|
description="Data to hash",
|
|
opt_type=OptionTypes.STRING,
|
|
required=False,
|
|
)
|
|
@slash_option(
|
|
name="attach", description="File to hash", opt_type=OptionTypes.ATTACHMENT, required=False
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
|
|
async def _hash(
|
|
self, ctx: InteractionContext, method: str, data: str = None, attach: Attachment = None
|
|
) -> None:
|
|
if not data and not attach:
|
|
await ctx.send(
|
|
"No data to hash",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if data and invites.match(data):
|
|
await ctx.send("No hashing invites", ephemeral=True)
|
|
return
|
|
title = data
|
|
if attach:
|
|
data = attach.url
|
|
title = attach.filename
|
|
elif url.match(data):
|
|
try:
|
|
if (size := await get_size(data)) > MAX_FILESIZE:
|
|
await ctx.send("Please hash files that are <= 5GB in size", ephemeral=True)
|
|
self.logger.debug(f"Refused to hash file of size {convert_bytesize(size)}")
|
|
return
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to retrieve URL: ```\n{e}\n```", ephemeral=True)
|
|
return
|
|
title = data.split("/")[-1]
|
|
|
|
await ctx.defer()
|
|
try:
|
|
hexstr, size, c_type = await hash(data, method)
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to hash data: ```\n{e}\n```", ephemeral=True)
|
|
return
|
|
|
|
data_size = convert_bytesize(size)
|
|
description = "Hashed using " + method
|
|
fields = [
|
|
EmbedField("Content Type", c_type, False),
|
|
EmbedField("Data Size", data_size, False),
|
|
EmbedField("Hash", f"`{hexstr}`", False),
|
|
]
|
|
|
|
embed = build_embed(title=title, description=description, fields=fields)
|
|
await ctx.send(embed=embed)
|
|
|
|
@slash_command(name="uuid", description="Generate a UUID")
|
|
@slash_option(
|
|
name="version",
|
|
description="UUID version",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
choices=[SlashCommandChoice(name=x, value=x) for x in ["3", "4", "5"]],
|
|
)
|
|
@slash_option(
|
|
name="data",
|
|
description="Data for UUID version 3,5",
|
|
opt_type=OptionTypes.STRING,
|
|
required=False,
|
|
)
|
|
async def _uuid(self, ctx: InteractionContext, version: str, data: str = None) -> None:
|
|
version = int(version)
|
|
if version in [3, 5] and not data:
|
|
await ctx.send(f"UUID{version} requires data.", ephemeral=True)
|
|
return
|
|
if version == 4:
|
|
await ctx.send(f"UUID4: `{uuidpy.uuid4()}`")
|
|
else:
|
|
to_send = None
|
|
if OID_VERIFY.match(data):
|
|
to_send = UUID_GET[version](uuidpy.NAMESPACE_OID, data)
|
|
elif URL_VERIFY.match(data):
|
|
to_send = UUID_GET[version](uuidpy.NAMESPACE_URL, data)
|
|
elif DN_VERIFY.match(data):
|
|
to_send = UUID_GET[version](uuidpy.NAMESPACE_X500, data)
|
|
else:
|
|
to_send = UUID_GET[version](uuidpy.NAMESPACE_DNS, data)
|
|
await ctx.send(f"UUID{version}: `{to_send}`")
|
|
|
|
@slash_command(
|
|
name="objectid",
|
|
description="Generate an ObjectID",
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
|
|
async def _objectid(self, ctx: InteractionContext) -> None:
|
|
await ctx.send(f"ObjectId: `{str(ObjectId())}`")
|
|
|
|
@slash_command(
|
|
name="ulid",
|
|
description="Generate a ULID",
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
|
|
async def _ulid(self, ctx: InteractionContext) -> None:
|
|
await ctx.send(f"ULID: `{ulidpy.new().str}`")
|
|
|
|
@slash_command(
|
|
name="uuid2ulid",
|
|
description="Convert a UUID to a ULID",
|
|
)
|
|
@slash_option(
|
|
name="uuid", description="UUID to convert", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
|
|
async def _uuid2ulid(self, ctx: InteractionContext, uuid: str) -> None:
|
|
if UUID_VERIFY.match(uuid):
|
|
u = ulidpy.parse(uuid)
|
|
await ctx.send(f"ULID: `{u.str}`")
|
|
else:
|
|
await ctx.send("Invalid UUID")
|
|
|
|
@slash_command(
|
|
name="ulid2uuid",
|
|
description="Convert a ULID to a UUID",
|
|
)
|
|
@slash_option(
|
|
name="ulid", description="ULID to convert", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
|
|
async def _ulid2uuid(self, ctx: InteractionContext, ulid: str) -> None:
|
|
if ULID_VERIFY.match(ulid):
|
|
ulid = ulidpy.parse(ulid)
|
|
await ctx.send(f"UUID: `{ulid.uuid}`")
|
|
else:
|
|
await ctx.send("Invalid ULID.")
|
|
|
|
base64_methods = ["b64", "b16", "b32", "a85", "b85"]
|
|
|
|
@slash_command(name="encode", description="Encode some data")
|
|
@slash_option(
|
|
name="method",
|
|
description="Encode method",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
choices=[SlashCommandChoice(name=x, value=x) for x in base64_methods],
|
|
)
|
|
@slash_option(
|
|
name="data",
|
|
description="Data to encode",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
)
|
|
async def _encode(self, ctx: InteractionContext, method: str, data: str) -> None:
|
|
if invites.search(data):
|
|
await ctx.send(
|
|
"Please don't use this to bypass invite restrictions",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
mstr = method
|
|
method = getattr(base64, method + "encode")
|
|
try:
|
|
encoded = method(data.encode("UTF-8")).decode("UTF-8")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to encode data: {e}")
|
|
return
|
|
fields = [
|
|
EmbedField(name="Plaintext", value=f"`{data}`", inline=False),
|
|
EmbedField(name=mstr, value=f"`{encoded}`", inline=False),
|
|
]
|
|
embed = build_embed(title="Decoded Data", description="", fields=fields)
|
|
await ctx.send(embed=embed)
|
|
|
|
@slash_command(name="decode", description="Decode some data")
|
|
@slash_option(
|
|
name="method",
|
|
description="Decode method",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
choices=[SlashCommandChoice(name=x, value=x) for x in base64_methods],
|
|
)
|
|
@slash_option(
|
|
name="data",
|
|
description="Data to encode",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
)
|
|
async def _decode(self, ctx: InteractionContext, method: str, data: str) -> None:
|
|
mstr = method
|
|
method = getattr(base64, method + "decode")
|
|
try:
|
|
decoded = method(data.encode("UTF-8")).decode("UTF-8")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to decode data: {e}")
|
|
return
|
|
if invites.search(decoded):
|
|
await ctx.send(
|
|
"Please don't use this to bypass invite restrictions",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
fields = [
|
|
EmbedField(name="Plaintext", value=f"`{data}`", inline=False),
|
|
EmbedField(name=mstr, value=f"`{decoded}`", inline=False),
|
|
]
|
|
embed = build_embed(title="Decoded Data", description="", fields=fields)
|
|
await ctx.send(embed=embed)
|
|
|
|
@slash_command(name="cloc", description="Get JARVIS lines of code")
|
|
@cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30)
|
|
async def _cloc(self, ctx: InteractionContext) -> None:
|
|
output = subprocess.check_output( # noqa: S603, S607
|
|
["tokei", "-C", "--sort", "code"]
|
|
).decode("UTF-8")
|
|
await ctx.send(f"```\n{output}\n```")
|
|
|
|
|
|
def setup(bot: Snake) -> None:
|
|
"""Add DevCog to JARVIS"""
|
|
DevCog(bot)
|