jarvis-bot/jarvis/cogs/dev.py
2021-08-06 20:24:50 -06:00

253 lines
8.3 KiB
Python

"""J.A.R.V.I.S. Developer Cog."""
import base64
import hashlib
import re
import subprocess # noqa: S404
import uuid as uuidpy
from typing import Any
from typing import Union
import ulid as ulidpy
from bson import ObjectId
from discord.ext import commands
from discord_slash import cog_ext
from discord_slash import SlashContext
from discord_slash.utils.manage_commands import create_choice
from discord_slash.utils.manage_commands import create_option
from jarvis.utils import build_embed
from jarvis.utils import convert_bytesize
from jarvis.utils.field import Field
supported_hashes = {x for x in hashlib.algorithms_guaranteed if "shake" not in x}
OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$")
URL_VERIFY = re.compile(r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
DN_VERIFY = re.compile(
r"^(?:(?P<cn>CN=(?P<name>[^,]*)),)?(?:(?P<path>(?:(?:CN|OU)=[^,]+,?)+),)?(?P<domain>(?:DC=[^,]+,?)+)$"
)
ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE)
UUID_VERIFY = re.compile(
r"[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
re.IGNORECASE,
)
invites = re.compile(
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
flags=re.IGNORECASE,
)
UUID_GET = {3: uuidpy.uuid3, 5: uuidpy.uuid5}
def hash_obj(hash: Any, data: Union[str, bytes], text: bool = True) -> str:
"""Hash data with hash object.
Data can be text or binary
"""
if text:
hash.update(data.encode("UTF-8"))
return hash.hexdigest()
BSIZE = 65536
block_idx = 0
while block_idx * BSIZE < len(data):
block = data[BSIZE * block_idx : BSIZE * (block_idx + 1)] # noqa: E203
hash.update(block)
block_idx += 1
return hash.hexdigest()
class DevCog(commands.Cog):
"""J.A.R.V.I.S. Developer Cog."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@cog_ext.cog_slash(
name="hash",
description="Hash some data",
options=[
create_option(
name="method",
description="Hash method",
option_type=3,
required=True,
choices=[create_choice(name=x, value=x) for x in supported_hashes],
),
create_option(
name="data",
description="Data to hash",
option_type=3,
required=True,
),
],
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _hash(self, ctx: SlashContext, method: str, data: str) -> None:
if not data:
await ctx.send(
"No data to hash",
hidden=True,
)
return
text = True
# Default to sha256, just in case
hash = getattr(hashlib, method, hashlib.sha256)()
hex = hash_obj(hash, data, text)
data_size = convert_bytesize(len(data))
title = data if text else ctx.message.attachments[0].filename
description = "Hashed using " + method
fields = [
Field("Data Size", data_size, False),
Field("Hash", f"`{hex}`", False),
]
embed = build_embed(title=title, description=description, fields=fields)
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="uuid",
description="Generate a UUID",
options=[
create_option(
name="version",
description="UUID version",
option_type=3,
required=True,
choices=[create_choice(name=x, value=x) for x in ["3", "4", "5"]],
),
create_option(
name="data",
description="Data for UUID version 3,5",
option_type=3,
required=False,
),
],
)
async def _uuid(self, ctx: SlashContext, version: str, data: str = None) -> None:
version = int(version)
if version in [3, 5] and not data:
await ctx.send(f"UUID{version} requires data.", hidden=True)
return
if version == 4:
await ctx.send(f"UUID4: `{uuidpy.uuid4()}`")
else:
to_send = None
if OID_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_OID, data)
elif URL_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_URL, data)
elif DN_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_X500, data)
else:
to_send = UUID_GET[version](uuidpy.NAMESPACE_DNS, data)
await ctx.send(f"UUID{version}: `{to_send}`")
@cog_ext.cog_slash(
name="objectid",
description="Generate an ObjectID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _objectid(self, ctx: SlashContext) -> None:
await ctx.send(f"ObjectId: `{str(ObjectId())}`")
@cog_ext.cog_slash(
name="ulid",
description="Generate a ULID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _ulid(self, ctx: SlashContext) -> None:
await ctx.send(f"ULID: `{ulidpy.new().str}`")
@cog_ext.cog_slash(
name="uuid2ulid",
description="Convert a UUID to a ULID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _uuid2ulid(self, ctx: SlashContext, uuid: str) -> None:
if UUID_VERIFY.match(uuid):
u = ulidpy.parse(uuid)
await ctx.send(f"ULID: `{u.str}`")
else:
await ctx.send("Invalid UUID")
@cog_ext.cog_slash(
name="ulid2uuid",
description="Convert a ULID to a UUID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _ulid2uuid(self, ctx: SlashContext, ulid: str) -> None:
if ULID_VERIFY.match(ulid):
ulid = ulidpy.parse(ulid)
await ctx.send(f"UUID: `{ulid.uuid}`")
else:
await ctx.send("Invalid ULID.")
base64_methods = ["b64", "b16", "b32", "a85", "b85"]
@cog_ext.cog_slash(
name="encode",
description="Encode some data",
options=[
create_option(
name="method",
description="Encode method",
option_type=3,
required=True,
choices=[create_choice(name=x, value=x) for x in base64_methods],
),
create_option(
name="data",
description="Data to encode",
option_type=3,
required=True,
),
],
)
async def _encode(self, ctx: SlashContext, method: str, data: str) -> None:
method = getattr(base64, method + "encode")
await ctx.send(f"`{method(data.encode('UTF-8')).decode('UTF-8')}`")
@cog_ext.cog_slash(
name="decode",
description="Decode some data",
options=[
create_option(
name="method",
description="Decode method",
option_type=3,
required=True,
choices=[create_choice(name=x, value=x) for x in base64_methods],
),
create_option(
name="data",
description="Data to encode",
option_type=3,
required=True,
),
],
)
async def _decode(self, ctx: SlashContext, method: str, data: str) -> None:
method = getattr(base64, method + "decode")
decoded = method(data.encode("UTF-8")).decode("UTF-8")
if invites.search(decoded):
await ctx.send(
"Please don't use this to bypass invite restrictions",
hidden=True,
)
return
await ctx.send(f"`{decoded}`")
@cog_ext.cog_slash(
name="cloc",
description="Get J.A.R.V.I.S. lines of code",
)
@commands.cooldown(1, 30, commands.BucketType.channel)
async def _cloc(self, ctx: SlashContext) -> None:
output = subprocess.check_output(["tokei", "-C", "--sort", "code"]).decode("UTF-8") # noqa: S603, S607
await ctx.send(f"```\n{output}\n```")
def setup(bot: commands.Bot) -> None:
"""Add DevCog to J.A.R.V.I.S."""
bot.add_cog(DevCog(bot))