jarvis-bot/jarvis/cogs/botutil.py

122 lines
4.9 KiB
Python

"""JARVIS bot utility commands."""
import asyncio
import logging
import platform
from io import BytesIO
from typing import get_type_hints
import git
import psutil
from aiofile import AIOFile, LineReader
from dis_snek import MessageContext, Scale, Snake
from dis_snek.client.utils.misc_utils import find
from dis_snek.models.discord.embed import EmbedField
from dis_snek.models.discord.file import File
from molter import msg_command
from jarvis.utils import build_embed, get_all_commands
class BotutilCog(Scale):
"""JARVIS Bot Utility Cog."""
def __init__(self, bot: Snake):
self.bot = bot
self.logger = logging.getLogger(__name__)
self.add_scale_check(self.is_owner)
async def is_owner(self, ctx: MessageContext) -> bool:
"""Checks if author is bot owner."""
return ctx.author.id == self.bot.owner.id
@msg_command(name="tail")
async def _tail(self, ctx: MessageContext, count: int = 10) -> None:
lines = []
async with AIOFile("jarvis.log", "r") as af:
async for line in LineReader(af):
lines.append(line)
if len(lines) == count + 1:
lines.pop(0)
log = "".join(lines)
if len(log) > 1500:
with BytesIO() as file_bytes:
file_bytes.write(log.encode("UTF8"))
file_bytes.seek(0)
log = File(file_bytes, file_name=f"tail_{count}.log")
await ctx.reply(content=f"Here's the last {count} lines of the log", file=log)
else:
await ctx.reply(content=f"```\n{log}\n```")
@msg_command(name="log")
async def _log(self, ctx: MessageContext) -> None:
async with AIOFile("jarvis.log", "r") as af:
with BytesIO() as file_bytes:
raw = await af.read_bytes()
file_bytes.write(raw)
file_bytes.seek(0)
log = File(file_bytes, file_name="jarvis.log")
await ctx.reply(content="Here's the latest log", file=log)
@msg_command(name="crash")
async def _crash(self, ctx: MessageContext) -> None:
raise Exception("As you wish")
@msg_command(name="sysinfo")
async def _sysinfo(self, ctx: MessageContext) -> None:
st_ts = int(self.bot.start_time.timestamp())
ut_ts = int(psutil.boot_time())
fields = [
EmbedField(name="Operation System", value=platform.system() or "Unknown", inline=False),
EmbedField(name="Version", value=platform.release() or "N/A", inline=False),
EmbedField(name="System Start Time", value=f"<t:{ut_ts}:F> (<t:{ut_ts}:R>)"),
EmbedField(name="Python Version", value=platform.python_version()),
EmbedField(name="Bot Start Time", value=f"<t:{st_ts}:F> (<t:{st_ts}:R>)"),
]
embed = build_embed(title="System Info", description="", fields=fields)
embed.set_image(url=self.bot.user.avatar.url)
await ctx.send(embed=embed)
@msg_command(name="update")
async def _update(self, ctx: MessageContext) -> None:
repo = git.Repo(".")
current_hash = repo.head.object.hexsha
origin = repo.remotes.origin
origin.fetch()
remote_hash = origin.refs[repo.active_branch.name].object.hexsha
if current_hash != remote_hash:
self.logger.info("Updating...")
current_commands = get_all_commands()
origin.pull()
self.logger.info("Changes pulled...")
await asyncio.sleep(3)
new_commands = get_all_commands()
for module, commands in new_commands.items():
if module not in current_commands:
self.bot.load_extension(module)
elif len(current_commands[module]) != len(commands):
self.bot.reload_extension(module)
else:
for command in commands:
old_command = find(
lambda x: x.__name__ == command.__name__, current_commands
)
old_args = get_type_hints(old_command)
new_args = get_type_hints(command)
if len(old_args) != len(new_args):
self.bot.reload_extension(module)
elif any(x not in old_args for x in new_args) or any(
x not in new_args for x in old_args
):
self.bot.reload_extension(module)
elif any(new_args[x] != y for x, y in old_args):
self.bot.reload_extension(module)
self.logger.info("Updates applied")
await ctx.reply(f"Updates applied! `{current_hash}` => `{remote_hash}`")
else:
await ctx.reply(f"No updates to apply. `{current_hash}` == `{remote_hash}`")
def setup(bot: Snake) -> None:
"""Add BotutilCog to JARVIS"""
BotutilCog(bot)