246 lines
9.6 KiB
Python
246 lines
9.6 KiB
Python
"""J.A.R.V.I.S. Owner Cog."""
|
|
import os
|
|
import sys
|
|
import traceback
|
|
from inspect import getsource
|
|
from time import time
|
|
from typing import Any
|
|
|
|
import discord
|
|
from discord import DMChannel, User
|
|
from discord.ext import commands
|
|
|
|
import jarvis
|
|
from jarvis.config import reload_config
|
|
from jarvis.db.models import Config
|
|
from jarvis.utils import update
|
|
from jarvis.utils.permissions import user_is_bot_admin
|
|
|
|
|
|
class OwnerCog(commands.Cog):
|
|
"""
|
|
J.A.R.V.I.S. management cog.
|
|
|
|
Used by admins to control core J.A.R.V.I.S. systems
|
|
"""
|
|
|
|
def __init__(self, bot: commands.Cog):
|
|
self.bot = bot
|
|
self.admins = Config.objects(key="admins").first()
|
|
|
|
@commands.command(name="load", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _load_cog(self, ctx: commands.Context, *, cog: str) -> None:
|
|
info = await self.bot.application_info()
|
|
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
try:
|
|
if "jarvis.cogs." not in cog:
|
|
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
self.bot.load_extension(cog)
|
|
except commands.errors.ExtensionAlreadyLoaded:
|
|
await ctx.send(f"Cog `{cog}` already loaded")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to load new cog `{cog}`: {type(e).name} - {e}")
|
|
else:
|
|
await ctx.send(f"Successfully loaded new cog `{cog}`")
|
|
else:
|
|
await ctx.send("I'm afraid I can't let you do that")
|
|
|
|
@commands.command(name="unload", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _unload_cog(self, ctx: commands.Context, *, cog: str) -> None:
|
|
if cog in ["jarvis.cogs.owner", "owner"]:
|
|
await ctx.send("Cannot unload `owner` cog")
|
|
return
|
|
info = await self.bot.application_info()
|
|
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
try:
|
|
if "jarvis.cogs." not in cog:
|
|
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
self.bot.unload_extension(cog)
|
|
except commands.errors.ExtensionNotLoaded:
|
|
await ctx.send(f"Cog `{cog}` not loaded")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to unload cog `{cog}` {type(e).__name__} - {e}")
|
|
else:
|
|
await ctx.send(f"Successfully unloaded cog `{cog}`")
|
|
else:
|
|
await ctx.send("I'm afraid I can't let you do that")
|
|
|
|
@commands.command(name="reload", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _cog_reload(self, ctx: commands.Context, *, cog: str) -> None:
|
|
if cog in ["jarvis.cogs.owner", "owner"]:
|
|
await ctx.send("Cannot reload `owner` cog")
|
|
return
|
|
info = await self.bot.application_info()
|
|
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
try:
|
|
if "jarvis.cogs." not in cog:
|
|
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
try:
|
|
self.bot.load_extension(cog)
|
|
except commands.errors.ExtensionNotLoaded:
|
|
pass
|
|
self.bot.unload_extension(cog)
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to reload cog `{cog}` {type(e).__name__} - {e}")
|
|
else:
|
|
await ctx.send(f"Successfully reloaded cog `{cog}`")
|
|
else:
|
|
await ctx.send("I'm afraid I can't let you do that")
|
|
|
|
@commands.group(name="system", hidden=True, pass_context=True)
|
|
@user_is_bot_admin()
|
|
async def _system(self, ctx: commands.Context) -> None:
|
|
if ctx.invoked_subcommand is None:
|
|
await ctx.send("Usage: `system <subcommand>`\n" + "Subcommands: `restart`, `update`")
|
|
|
|
@_system.command(name="restart", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _restart(self, ctx: commands.Context) -> None:
|
|
info = await self.bot.application_info()
|
|
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
await ctx.send("Restarting core systems...")
|
|
if isinstance(ctx.channel, discord.channel.DMChannel):
|
|
jarvis.restart_ctx = {
|
|
"user": ctx.message.author.id,
|
|
"channel": ctx.channel.id,
|
|
}
|
|
else:
|
|
jarvis.restart_ctx = {
|
|
"guild": ctx.message.guild.id,
|
|
"channel": ctx.channel.id,
|
|
}
|
|
await self.bot.close()
|
|
else:
|
|
await ctx.send("I'm afraid I can't let you do that")
|
|
|
|
@_system.command(name="update", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _update(self, ctx: commands.Context) -> None:
|
|
info = await self.bot.application_info()
|
|
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
await ctx.send("Updating core systems...")
|
|
status = update()
|
|
if status == 0:
|
|
await ctx.send("Core systems updated. Restarting...")
|
|
if isinstance(ctx.channel, discord.channel.DMChannel):
|
|
jarvis.restart_ctx = {
|
|
"user": ctx.message.author.id,
|
|
"channel": ctx.channel.id,
|
|
}
|
|
else:
|
|
jarvis.restart_ctx = {
|
|
"guild": ctx.message.guild.id,
|
|
"channel": ctx.channel.id,
|
|
}
|
|
await self.bot.close()
|
|
elif status == 1:
|
|
await ctx.send("Core systems already up to date.")
|
|
elif status == 2:
|
|
await ctx.send("Core system update available, but core is dirty.")
|
|
else:
|
|
await ctx.send("I'm afraid I can't let you do that")
|
|
|
|
@_system.command(name="refresh", hidden=True)
|
|
@user_is_bot_admin()
|
|
async def _refresh(self, ctx: commands.Context) -> None:
|
|
reload_config()
|
|
await ctx.send("System refreshed")
|
|
|
|
@commands.group(name="admin", hidden=True, pass_context=True)
|
|
@commands.is_owner()
|
|
async def _admin(self, ctx: commands.Context) -> None:
|
|
if ctx.invoked_subcommand is None:
|
|
await ctx.send("Usage: `admin <subcommand>`\n" + "Subcommands: `add`, `remove`")
|
|
|
|
@_admin.command(name="add", hidden=True)
|
|
@commands.is_owner()
|
|
async def _add(self, ctx: commands.Context, user: User) -> None:
|
|
if user.id in self.admins.value:
|
|
await ctx.send(f"{user.mention} is already an admin.")
|
|
return
|
|
self.admins.value.append(user.id)
|
|
self.admins.save()
|
|
reload_config()
|
|
await ctx.send(f"{user.mention} is now an admin. Use this power carefully.")
|
|
|
|
@_admin.command(name="remove", hidden=True)
|
|
@commands.is_owner()
|
|
async def _remove(self, ctx: commands.Context, user: User) -> None:
|
|
if user.id not in self.admins.value:
|
|
await ctx.send(f"{user.mention} is not an admin.")
|
|
return
|
|
self.admins.value.remove(user.id)
|
|
self.admins.save()
|
|
reload_config()
|
|
await ctx.send(f"{user.mention} is no longer an admin.")
|
|
|
|
def resolve_variable(self, variable: Any) -> Any:
|
|
"""Resolve a variable from eval."""
|
|
if hasattr(variable, "__iter__"):
|
|
var_length = len(list(variable))
|
|
if (var_length > 100) and (not isinstance(variable, str)):
|
|
return f"<a {type(variable).__name__} iterable " + f"with more than 100 values ({var_length})>"
|
|
elif not var_length:
|
|
return f"<an empty {type(variable).__name__} iterable>"
|
|
|
|
if (not variable) and (not isinstance(variable, bool)):
|
|
return f"<an empty {type(variable).__name__} object>"
|
|
return (
|
|
variable
|
|
if (len(f"{variable}") <= 1000)
|
|
else f"<a long {type(variable).__name__} object " + f"with the length of {len(f'{variable}'):,}>"
|
|
)
|
|
|
|
def prepare(self, string: str) -> str:
|
|
"""Prepare string for eval."""
|
|
arr = string.strip("```").replace("py\n", "").replace("python\n", "").split("\n")
|
|
if not arr[::-1][0].replace(" ", "").startswith("return"):
|
|
arr[len(arr) - 1] = "return " + arr[::-1][0]
|
|
return "".join(f"\n\t{i}" for i in arr)
|
|
|
|
@commands.command(pass_context=True, aliases=["eval", "exec", "evaluate"])
|
|
@user_is_bot_admin()
|
|
async def _eval(self, ctx: commands.Context, *, code: str) -> None:
|
|
if not isinstance(ctx.message.channel, DMChannel):
|
|
return
|
|
code = self.prepare(code)
|
|
args = {
|
|
"discord": discord,
|
|
"sauce": getsource,
|
|
"sys": sys,
|
|
"os": os,
|
|
"imp": __import__,
|
|
"this": self,
|
|
"ctx": ctx,
|
|
}
|
|
|
|
try:
|
|
exec( # noqa: S102
|
|
f"async def func():{code}",
|
|
globals().update(args),
|
|
locals(),
|
|
)
|
|
a = time()
|
|
response = await eval("func()", globals().update(args), locals()) # noqa: S307
|
|
if response is None or isinstance(response, discord.Message):
|
|
del args, code
|
|
return
|
|
|
|
if isinstance(response, str):
|
|
response = response.replace("`", "")
|
|
|
|
await ctx.send(
|
|
f"```py\n{self.resolve_variable(response)}```\n`{type(response).__name__} | {(time() - a) / 1000} ms`"
|
|
)
|
|
except Exception:
|
|
await ctx.send(f"Error occurred:```\n{traceback.format_exc()}```")
|
|
|
|
del args, code
|
|
|
|
|
|
def setup(bot: commands.Bot) -> None:
|
|
"""Add OwnerCog to J.A.R.V.I.S."""
|
|
bot.add_cog(OwnerCog(bot))
|