jarvis-bot/jarvis/cogs/dev.py

277 lines
8.3 KiB
Python

import base64
import hashlib
import re
import subprocess
import uuid as uuidpy
import ulid as ulidpy
from bson import ObjectId
from discord.ext import commands
from discord_slash import cog_ext
from discord_slash.utils.manage_commands import create_choice, create_option
from jarvis.utils import build_embed, convert_bytesize
from jarvis.utils.field import Field
supported_hashes = {
x for x in hashlib.algorithms_guaranteed if "shake" not in x
}
OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$")
URL_VERIFY = re.compile(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]"
+ r"|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
)
DN_VERIFY = re.compile(
r"^(?:(?P<cn>CN=(?P<name>[^,]*)),)"
+ r"?(?:(?P<path>(?:(?:CN|OU)=[^,]+,?)+),)"
+ r"?(?P<domain>(?:DC=[^,]+,?)+)$"
)
ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE)
UUID_VERIFY = re.compile(
(
"[a-f0-9]{8}-"
+ "[a-f0-9]{4}-"
+ "[1-5]"
+ "[a-f0-9]{3}-"
+ "[89ab][a-f0-9]{3}-"
+ "[a-f0-9]{12}$"
),
re.IGNORECASE,
)
invites = re.compile(
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
flags=re.IGNORECASE,
)
UUID_GET = {3: uuidpy.uuid3, 5: uuidpy.uuid5}
def hash_obj(hash, data, text: bool = True) -> str:
"""
Hash data with hash object
Data can be text or binary
"""
if text:
hash.update(data.encode("UTF-8"))
return hash.hexdigest()
BSIZE = 65536
block_idx = 0
while block_idx * BSIZE < len(data):
block = data[BSIZE * block_idx : BSIZE * (block_idx + 1)]
hash.update(block)
block_idx += 1
return hash.hexdigest()
class DevCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@cog_ext.cog_slash(
name="hash",
description="Hash some data",
options=[
create_option(
name="method",
description="Hash method",
option_type=3,
required=True,
choices=[
create_choice(name=x, value=x) for x in supported_hashes
],
),
create_option(
name="data",
description="Data to hash",
option_type=3,
required=True,
),
],
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _hash(self, ctx, method: str, data: str):
if not data:
await ctx.send(
"No data to hash",
hidden=True,
)
return
text = True
# Default to sha256, just in case
hash = getattr(hashlib, method, hashlib.sha256)()
hex = hash_obj(hash, data, text)
data_size = convert_bytesize(len(data))
title = data if text else ctx.message.attachments[0].filename
description = "Hashed using " + method
fields = [
Field("Data Size", data_size, False),
Field("Hash", f"`{hex}`", False),
]
embed = build_embed(
title=title, description=description, fields=fields
)
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="uuid",
description="Generate a UUID",
options=[
create_option(
name="version",
description="UUID version",
option_type=3,
required=True,
choices=[
create_choice(name=x, value=x) for x in ["3", "4", "5"]
],
),
create_option(
name="data",
description="Data for UUID version 3,5",
option_type=3,
required=False,
),
],
)
async def _uuid(self, ctx, version: str, data: str = None):
version = int(version)
if version in [3, 5] and not data:
await ctx.send(f"UUID{version} requires data.", hidden=True)
return
if version == 4:
await ctx.send(f"UUID4: `{uuidpy.uuid4()}`")
else:
to_send = None
if OID_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_OID, data)
elif URL_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_URL, data)
elif DN_VERIFY.match(data):
to_send = UUID_GET[version](uuidpy.NAMESPACE_X500, data)
else:
to_send = UUID_GET[version](uuidpy.NAMESPACE_DNS, data)
await ctx.send(f"UUID{version}: `{to_send}`")
@cog_ext.cog_slash(
name="objectid",
description="Generate an ObjectID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _objectid(self, ctx):
"""Generates new bson.ObjectId"""
await ctx.send(f"ObjectId: `{str(ObjectId())}`")
@cog_ext.cog_slash(
name="ulid",
description="Generate a ULID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _ulid(self, ctx):
"""Generates a new ULID"""
await ctx.send(f"ULID: `{ulidpy.new().str}`")
@cog_ext.cog_slash(
name="uuid2ulid",
description="Convert a UUID to a ULID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _uuid2ulid(self, ctx: commands.Context, uuid):
"""Converts a UUID to a ULID"""
if UUID_VERIFY.match(uuid):
u = ulidpy.parse(uuid)
await ctx.send(f"ULID: `{u.str}`")
else:
await ctx.send("Invalid UUID")
@cog_ext.cog_slash(
name="ulid2uuid",
description="Convert a ULID to a UUID",
)
@commands.cooldown(1, 2, commands.BucketType.user)
async def _ulid2uuid(self, ctx: commands.Context, ulid):
"""Converts a ULID to a UUID"""
if ULID_VERIFY.match(ulid):
ulid = ulidpy.parse(ulid)
await ctx.send(f"UUID: `{ulid.uuid}`")
else:
await ctx.send("Invalid ULID.")
base64_methods = ["b64", "b16", "b32", "a85", "b85"]
@cog_ext.cog_slash(
name="encode",
description="Encode some data",
options=[
create_option(
name="method",
description="Encode method",
option_type=3,
required=True,
choices=[
create_choice(name=x, value=x) for x in base64_methods
],
),
create_option(
name="data",
description="Data to encode",
option_type=3,
required=True,
),
],
)
async def _encode(self, ctx, method: str, data: str):
"""Encodes text with specified encoding method"""
method = getattr(base64, method + "encode")
await ctx.send(f"`{method(data.encode('UTF-8')).decode('UTF-8')}`")
@cog_ext.cog_slash(
name="decode",
description="Decode some data",
options=[
create_option(
name="method",
description="Decode method",
option_type=3,
required=True,
choices=[
create_choice(name=x, value=x) for x in base64_methods
],
),
create_option(
name="data",
description="Data to encode",
option_type=3,
required=True,
),
],
)
async def _decode(self, ctx, method: str, data: str):
"""Decodes text with specified encoding method"""
method = getattr(base64, method + "decode")
decoded = method(data.encode("UTF-8")).decode("UTF-8")
if invites.search(decoded):
await ctx.send(
"Please don't use this to bypass invite restrictions",
hidden=True,
)
return
await ctx.send(f"`{decoded}`")
@cog_ext.cog_slash(
name="cloc",
description="Get J.A.R.V.I.S. lines of code",
)
@commands.cooldown(1, 30, commands.BucketType.channel)
async def _cloc(self, ctx):
output = subprocess.check_output(
["tokei", "-C", "--sort", "code"]
).decode("UTF-8")
await ctx.send(f"```\n{output}\n```")
def setup(bot):
bot.add_cog(DevCog(bot))