jarvis-bot/jarvis/cogs/dev.py

303 lines
9.7 KiB
Python

import jarvis
import hashlib
import uuid
import re
import ulid
import base64
import subprocess
import discord
import sys
import os
import traceback
from time import time
from inspect import getsource
from discord.ext import commands
from jarvis.utils import build_embed, convert_bytesize
from jarvis.utils.field import Field
from bson import ObjectId
supported_hashes = {
x for x in hashlib.algorithms_guaranteed if "shake" not in x
}
OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$")
URL_VERIFY = re.compile(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]"
+ r"|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
)
DN_VERIFY = re.compile(
r"^(?:(?P<cn>CN=(?P<name>[^,]*)),)"
+ r"?(?:(?P<path>(?:(?:CN|OU)=[^,]+,?)+),)"
+ r"?(?P<domain>(?:DC=[^,]+,?)+)$"
)
ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE)
UUID_VERIFY = re.compile(
(
"[a-f0-9]{8}-"
+ "[a-f0-9]{4}-"
+ "[1-5]"
+ "[a-f0-9]{3}-"
+ "[89ab][a-f0-9]{3}-"
+ "[a-f0-9]{12}$"
),
re.IGNORECASE,
)
UUID_GET = {3: uuid.uuid3, 5: uuid.uuid5}
def hash_obj(hash, data, text: bool = True) -> str:
"""
Hash data with hash object
Data can be text or binary
"""
if text:
hash.update(data.encode("UTF-8"))
return hash.hexdigest()
BSIZE = 65536
block_idx = 0
while block_idx * BSIZE < len(data):
block = data[BSIZE * block_idx : BSIZE * (block_idx + 1)]
hash.update(block)
block_idx += 1
return hash.hexdigest()
class DevCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@commands.command(name="hash")
async def _hash(self, ctx, method: str, *, data: str = None):
if method not in supported_hashes:
algo_txt = ", ".join(f"`{x}`" for x in supported_hashes)
await ctx.send(
"Unsupported hash algorithm. Supported:\n" + algo_txt
)
return
if not data and len(ctx.message.attachments) == 0:
await ctx.send(
"No data to hash. Either attach a file or send text to hash"
)
return
text = True
if len(ctx.message.attachments) > 0:
text = False
data = await ctx.message.attachments[0].read()
# Default to sha256, just in case
hash = getattr(hashlib, method, hashlib.sha256)()
hex = hash_obj(hash, data, text)
data_size = convert_bytesize(len(data))
title = data if text else ctx.message.attachments[0].filename
description = "Hashed using " + method
fields = [
Field("Data Size", data_size, False),
Field("Hash", f"`{hex}`", False),
]
embed = build_embed(
title=title, description=description, fields=fields
)
await ctx.send(embed=embed)
@commands.command(name="uuid")
async def _uuid(self, ctx, version: str = None, data: str = None):
if not version:
await ctx.send("Supported UUID versions: 3, 4, 5")
return
if version not in ["3", "4", "5"]:
await ctx.send("Supported UUID versions: 3, 4, 5")
return
version = int(version)
if version in [3, 5] and not data:
await ctx.send(f"UUID{version} requires data.")
return
if version == 4:
await ctx.send(f"UUID4: `{uuid.uuid4()}`")
else:
to_send = None
if OID_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_OID, data)
elif URL_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_URL, data)
elif DN_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_X500, data)
else:
to_send = UUID_GET[version](uuid.NAMESPACE_DNS, data)
await ctx.send(f"UUID{version}: `{to_send}`")
@commands.command(name="objectid")
async def _objectid(self, ctx: commands.Context):
"""Generates new bson.ObjectId"""
await ctx.send(f"ObjectId: `{str(ObjectId())}`")
@commands.command(name="ulid")
async def _ulid(self, ctx: commands.Context):
"""Generates a new ULID"""
await ctx.send(f"ULID: `{ulid.new().str}`")
@commands.command(name="uuid2ulid")
async def _uuid2ulid(self, ctx: commands.Context, u):
"""Converts a UUID to a ULID"""
if UUID_VERIFY.match(u):
u = ulid.parse(u)
await ctx.send(f"ULID: `{u.str}`")
else:
await ctx.send("Invalid UUID")
@commands.command(name="ulid2uuid")
async def _ulid2uuid(self, ctx: commands.Context, u):
"""Converts a ULID to a UUID"""
if ULID_VERIFY.match(u):
u = ulid.parse(u)
await ctx.send(f"UUID: `{u.uuid}`")
else:
await ctx.send("Invalid ULID.")
@commands.group(name="encode")
async def _encode(self, ctx):
"""Encodes text with specified encoding method"""
if ctx.invoked_subcommand is None:
await ctx.send("Usage: encode <method> <data>")
@_encode.command(name="b64")
async def _b64e(self, ctx, *, data: str):
"""Base64 encoding"""
await ctx.send(base64.b64encode(data.encode("UTF-8")).decode("UTF-8"))
@_encode.command(name="b16")
async def _b16e(self, ctx, *, data: str):
"""Base16 encoding"""
await ctx.send(
"`" + base64.b16encode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_encode.command(name="b32")
async def _b32e(self, ctx, *, data: str):
"""Base32 encoding"""
await ctx.send(
"`" + base64.b32encode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_encode.command(name="a85")
async def _a85e(self, ctx, *, data: str):
"""ASCII85 encoding"""
await ctx.send(
"`" + base64.a85encode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_encode.command(name="b85")
async def _b85e(self, ctx, *, data: str):
"""Base85 encoding"""
await ctx.send(
"`" + base64.b85encode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@commands.group(name="decode")
async def _decode(self, ctx):
"""Decodes text with specified encoding method"""
if ctx.invoked_subcommand is None:
await ctx.send("Usage: decode <method> <data>")
@_decode.command(name="b64")
async def _b64d(self, ctx, *, data: str):
"""Base64 decoding"""
await ctx.send(
"`" + base64.b64decode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_decode.command(name="b16")
async def _b16d(self, ctx, *, data: str):
"""Base16 decoding"""
await ctx.send(
"`" + base64.b16decode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_decode.command(name="b32")
async def _b32d(self, ctx, *, data: str):
"""Base32 decoding"""
await ctx.send(
"`" + base64.b32decode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_decode.command(name="a85")
async def _a85d(self, ctx, *, data: str):
"""ASCII85 decoding"""
await ctx.send(
"`" + base64.a85decode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@_decode.command(name="b85")
async def _b85d(self, ctx, *, data: str):
"""Base85 decoding"""
await ctx.send(
"`" + base64.b85decode(data.encode("UTF-8")).decode("UTF-8") + "`"
)
@commands.command(name="cloc", help="Get J.A.R.V.I.S. lines of code")
async def _cloc(self, ctx):
output = subprocess.check_output(["cloc", "."]).decode("UTF-8")
await ctx.send(f"```\n{output}\n```")
def resolve_variable(self, variable):
if hasattr(variable, "__iter__"):
var_length = len(list(variable))
if (var_length > 100) and (not isinstance(variable, str)):
return f"<a {type(variable).__name__} iterable with more than 100 values ({var_length})>"
elif not var_length:
return f"<an empty {type(variable).__name__} iterable>"
if (not variable) and (not isinstance(variable, bool)):
return f"<an empty {type(variable).__name__} object>"
return (
variable
if (len(f"{variable}") <= 1000)
else f"<a long {type(variable).__name__} object with the length of {len(f'{variable}'):,}>"
)
def prepare(self, string):
arr = (
string.strip("```")
.replace("py\n", "")
.replace("python\n", "")
.split("\n")
)
if not arr[::-1][0].replace(" ", "").startswith("return"):
arr[len(arr) - 1] = "return " + arr[::-1][0]
return "".join(f"\n\t{i}" for i in arr)
@commands.command(pass_context=True, aliases=["eval", "exec", "evaluate"])
@commands.is_owner()
async def _eval(self, ctx, *, code: str):
args = {
"discord": discord,
"sauce": getsource,
"sys": sys,
"os": os,
"imp": __import__,
"this": self,
"ctx": ctx,
}
try:
exec(f"async def func():{code}", globals(), locals())
a = time()
response = await eval("func()", globals(), locals())
if response is None or isinstance(response, discord.Message):
del args, code
return
await ctx.send(
f"```py\n{self.resolve_variable(response)}````{type(response).__name__} | {(time() - a) / 1000} ms`"
)
except Exception as e:
await ctx.send(f"Error occurred:```\n{traceback.format_exc()}```")
del args, code
def setup(bot):
bot.add_cog(DevCog(bot))