jarvis-bot/jarvis/cogs/dev.py
2021-07-15 14:35:27 -06:00

364 lines
12 KiB
Python

import base64
import hashlib
import os
import re
import subprocess
import sys
import traceback
import uuid
from inspect import getsource
from time import time
import discord
import ulid
from bson import ObjectId
from discord.ext import commands
from discord_slash import cog_ext
import jarvis
from jarvis.utils import build_embed, convert_bytesize
from jarvis.utils.field import Field
from jarvis.utils.permissions import user_is_bot_admin
supported_hashes = {
x for x in hashlib.algorithms_guaranteed if "shake" not in x
}
OID_VERIFY = re.compile(r"^([1-9][0-9]{0,3}|0)(\.([1-9][0-9]{0,3}|0)){5,13}$")
URL_VERIFY = re.compile(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]"
+ r"|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
)
DN_VERIFY = re.compile(
r"^(?:(?P<cn>CN=(?P<name>[^,]*)),)"
+ r"?(?:(?P<path>(?:(?:CN|OU)=[^,]+,?)+),)"
+ r"?(?P<domain>(?:DC=[^,]+,?)+)$"
)
ULID_VERIFY = re.compile(r"^[0-9a-z]{26}$", re.IGNORECASE)
UUID_VERIFY = re.compile(
(
"[a-f0-9]{8}-"
+ "[a-f0-9]{4}-"
+ "[1-5]"
+ "[a-f0-9]{3}-"
+ "[89ab][a-f0-9]{3}-"
+ "[a-f0-9]{12}$"
),
re.IGNORECASE,
)
UUID_GET = {3: uuid.uuid3, 5: uuid.uuid5}
def hash_obj(hash, data, text: bool = True) -> str:
"""
Hash data with hash object
Data can be text or binary
"""
if text:
hash.update(data.encode("UTF-8"))
return hash.hexdigest()
BSIZE = 65536
block_idx = 0
while block_idx * BSIZE < len(data):
block = data[BSIZE * block_idx : BSIZE * (block_idx + 1)]
hash.update(block)
block_idx += 1
return hash.hexdigest()
class DevCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
async def _hash(self, ctx, method: str, *, data: str = None):
if method not in supported_hashes:
algo_txt = ", ".join(f"`{x}`" for x in supported_hashes)
await ctx.send(
"Unsupported hash algorithm. Supported:\n" + algo_txt,
hidden=True,
)
return
if not data and len(ctx.message.attachments) == 0:
await ctx.send(
"No data to hash. Either attach a file or send text to hash"
)
return
text = True
if len(ctx.message.attachments) > 0:
text = False
data = await ctx.message.attachments[0].read()
# Default to sha256, just in case
hash = getattr(hashlib, method, hashlib.sha256)()
hex = hash_obj(hash, data, text)
data_size = convert_bytesize(len(data))
title = data if text else ctx.message.attachments[0].filename
description = "Hashed using " + method
fields = [
Field("Data Size", data_size, False),
Field("Hash", f"`{hex}`", False),
]
embed = build_embed(
title=title, description=description, fields=fields
)
await ctx.send(embed=embed)
@commands.command(name="hash")
async def _hash_pref(self, ctx, method: str, *, data: str = None):
await self._hash(ctx, method, data=data)
@cog_ext.cog_slash(
name="hash",
description="Hash some data",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _hash_slash(self, ctx, method: str, *, data: str = None):
await self._hash(ctx, method, data=data)
async def _uuid(self, ctx, version: str = None, data: str = None):
if not version:
await ctx.send("Supported UUID versions: 3, 4, 5", hidden=True)
return
if version not in ["3", "4", "5"]:
await ctx.send("Supported UUID versions: 3, 4, 5", hidden=True)
return
version = int(version)
if version in [3, 5] and not data:
await ctx.send(f"UUID{version} requires data.", hidden=True)
return
if version == 4:
await ctx.send(f"UUID4: `{uuid.uuid4()}`")
else:
to_send = None
if OID_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_OID, data)
elif URL_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_URL, data)
elif DN_VERIFY.match(data):
to_send = UUID_GET[version](uuid.NAMESPACE_X500, data)
else:
to_send = UUID_GET[version](uuid.NAMESPACE_DNS, data)
await ctx.send(f"UUID{version}: `{to_send}`")
@commands.command(name="uuid")
async def _uuid_pref(self, ctx, version: str = None, data: str = None):
await self._uuid(ctx, version, data)
@cog_ext.cog_slash(
name="uuid",
description="Generate a UUID",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _uuid_slash(self, ctx, version: str = None, data: str = None):
await self._uuid(ctx, version, data)
async def _objectid(self, ctx):
"""Generates new bson.ObjectId"""
await ctx.send(f"ObjectId: `{str(ObjectId())}`")
@commands.command(name="objectid")
async def _objectid_pref(self, ctx):
await self._objectid(ctx)
@cog_ext.cog_slash(
name="objectid",
description="Generate an ObjectID",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _objectid_slash(self, ctx):
await self._objectid(ctx)
async def _ulid(self, ctx):
"""Generates a new ULID"""
await ctx.send(f"ULID: `{ulid.new().str}`")
@commands.command(name="ulid")
async def _ulid_pref(self, ctx):
await self._ulid(ctx)
@cog_ext.cog_slash(
name="ulid",
description="Generate a ULID",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _ulid_slash(self, ctx):
await self._ulid(ctx)
async def _uuid2ulid(self, ctx: commands.Context, u):
"""Converts a UUID to a ULID"""
if UUID_VERIFY.match(u):
u = ulid.parse(u)
await ctx.send(f"ULID: `{u.str}`")
else:
await ctx.send("Invalid UUID")
@commands.command(name="uuid2ulid")
async def _uuid2ulid_pref(self, ctx, u: str):
await self._uuid2ulid(ctx, u)
@cog_ext.cog_slash(
name="uuid2ulid",
description="Convert a UUID to a ULID",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _uuid2ulid_slash(self, ctx, u: str):
await self._uuid2ulid(ctx, u)
async def _ulid2uuid(self, ctx: commands.Context, u):
"""Converts a ULID to a UUID"""
if ULID_VERIFY.match(u):
u = ulid.parse(u)
await ctx.send(f"UUID: `{u.uuid}`")
else:
await ctx.send("Invalid ULID.")
@commands.command(name="ulid2uuid")
async def _ulid2uuid_pref(self, ctx, u):
await self._ulid2uuid(ctx, u)
@cog_ext.cog_slash(
name="ulid2uuid",
description="Convert a ULID to a UUID",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _ulid2uuid_slash(self, ctx, u):
await self._ulid2uuid(ctx, u)
base64_methods = ["b64", "b16", "b32", "a85", "b85"]
async def _encode(self, ctx, method: str, data: str):
"""Encodes text with specified encoding method"""
if method not in self.base64_methods:
methods = ", ".join(f"`{x}`" for x in self.base64_methods)
await ctx.send(
"Usage: encode <method> <data>\nSupported methods:\n"
+ methods,
hidden=True,
)
return
method = getattr(base64, method + "encode")
await ctx.send(f"`{method(data.encode('UTF-8')).decode('UTF-8')}`")
@commands.command(name="encode")
async def _encode_pref(self, ctx, method: str, *, data: str):
await self._encode(ctx, method, data)
@cog_ext.cog_slash(
name="encode",
description="Encode using the base64 module",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _encode_slash(self, ctx, method: str, *, data: str):
await self._encode(ctx, method, data)
async def _decode(self, ctx, method: str, data: str):
"""Decodes text with specified encoding method"""
if method not in self.base64_methods:
methods = ", ".join(f"`{x}`" for x in self.base64_methods)
await ctx.send(
"Usage: decode <method> <data>\nSupported methods:\n"
+ methods,
hidden=True,
)
return
method = getattr(base64, method + "decode")
await ctx.send(f"`{method(data.encode('UTF-8')).decode('UTF-8')}`")
@commands.command(name="decode")
async def _decode_pref(self, ctx, method: str, *, data: str):
await self._decode(ctx, method, data)
@cog_ext.cog_slash(
name="decode",
description="Decode using the base64 module",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _decode_slash(self, ctx, method: str, *, data: str):
await self._decode(ctx, method, data)
async def _cloc(self, ctx):
output = subprocess.check_output(
["tokei", "-C", "--sort", "code"]
).decode("UTF-8")
await ctx.send(f"```\n{output}\n```")
@commands.command(name="cloc", help="Get J.A.R.V.I.S. lines of code")
async def _cloc_pref(self, ctx):
await self._cloc(ctx)
@cog_ext.cog_slash(
name="cloc",
description="Get J.A.R.V.I.S. lines of code",
guild_ids=[862402786116763668, 578757004059738142],
)
async def _cloc_slash(self, ctx):
await ctx.defer()
await self._cloc(ctx)
def resolve_variable(self, variable):
if hasattr(variable, "__iter__"):
var_length = len(list(variable))
if (var_length > 100) and (not isinstance(variable, str)):
return f"<a {type(variable).__name__} iterable with more than 100 values ({var_length})>"
elif not var_length:
return f"<an empty {type(variable).__name__} iterable>"
if (not variable) and (not isinstance(variable, bool)):
return f"<an empty {type(variable).__name__} object>"
return (
variable
if (len(f"{variable}") <= 1000)
else f"<a long {type(variable).__name__} object with the length of {len(f'{variable}'):,}>"
)
def prepare(self, string):
arr = (
string.strip("```")
.replace("py\n", "")
.replace("python\n", "")
.split("\n")
)
if not arr[::-1][0].replace(" ", "").startswith("return"):
arr[len(arr) - 1] = "return " + arr[::-1][0]
return "".join(f"\n\t{i}" for i in arr)
@commands.command(pass_context=True, aliases=["eval", "exec", "evaluate"])
@user_is_bot_admin()
async def _eval(self, ctx, *, code: str):
code = self.prepare(code)
args = {
"discord": discord,
"sauce": getsource,
"sys": sys,
"os": os,
"imp": __import__,
"this": self,
"ctx": ctx,
}
try:
exec(f"async def func():{code}", globals().update(args), locals())
a = time()
response = await eval("func()", globals().update(args), locals())
if response is None or isinstance(response, discord.Message):
del args, code
return
if isinstance(response, str):
response = response.replace("`", "")
await ctx.send(
f"```py\n{self.resolve_variable(response)}```"
+ f"`{type(response).__name__} | {(time() - a) / 1000} ms`"
)
except Exception:
await ctx.send(f"Error occurred:```\n{traceback.format_exc()}```")
del args, code
def setup(bot):
bot.add_cog(DevCog(bot))