Remove 90% of owner cog
This commit is contained in:
parent
7ad353e62e
commit
d7c9742ce5
1 changed files with 1 additions and 196 deletions
|
@ -1,20 +1,9 @@
|
||||||
"""J.A.R.V.I.S. Owner Cog."""
|
"""J.A.R.V.I.S. Owner Cog."""
|
||||||
import os
|
from discord import User
|
||||||
import sys
|
|
||||||
import traceback
|
|
||||||
from inspect import getsource
|
|
||||||
from time import time
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import discord
|
|
||||||
from discord import DMChannel, User
|
|
||||||
from discord.ext import commands
|
from discord.ext import commands
|
||||||
|
|
||||||
import jarvis
|
|
||||||
from jarvis.config import reload_config
|
from jarvis.config import reload_config
|
||||||
from jarvis.db.models import Config
|
from jarvis.db.models import Config
|
||||||
from jarvis.utils import update
|
|
||||||
from jarvis.utils.permissions import user_is_bot_admin
|
|
||||||
|
|
||||||
|
|
||||||
class OwnerCog(commands.Cog):
|
class OwnerCog(commands.Cog):
|
||||||
|
@ -28,127 +17,6 @@ class OwnerCog(commands.Cog):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.admins = Config.objects(key="admins").first()
|
self.admins = Config.objects(key="admins").first()
|
||||||
|
|
||||||
@commands.command(name="load", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _load_cog(self, ctx: commands.Context, *, cog: str) -> None:
|
|
||||||
info = await self.bot.application_info()
|
|
||||||
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
||||||
try:
|
|
||||||
if "jarvis.cogs." not in cog:
|
|
||||||
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
||||||
self.bot.load_extension(cog)
|
|
||||||
except commands.errors.ExtensionAlreadyLoaded:
|
|
||||||
await ctx.send(f"Cog `{cog}` already loaded")
|
|
||||||
except Exception as e:
|
|
||||||
await ctx.send(f"Failed to load new cog `{cog}`: {type(e).name} - {e}")
|
|
||||||
else:
|
|
||||||
await ctx.send(f"Successfully loaded new cog `{cog}`")
|
|
||||||
else:
|
|
||||||
await ctx.send("I'm afraid I can't let you do that")
|
|
||||||
|
|
||||||
@commands.command(name="unload", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _unload_cog(self, ctx: commands.Context, *, cog: str) -> None:
|
|
||||||
if cog in ["jarvis.cogs.owner", "owner"]:
|
|
||||||
await ctx.send("Cannot unload `owner` cog")
|
|
||||||
return
|
|
||||||
info = await self.bot.application_info()
|
|
||||||
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
||||||
try:
|
|
||||||
if "jarvis.cogs." not in cog:
|
|
||||||
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
||||||
self.bot.unload_extension(cog)
|
|
||||||
except commands.errors.ExtensionNotLoaded:
|
|
||||||
await ctx.send(f"Cog `{cog}` not loaded")
|
|
||||||
except Exception as e:
|
|
||||||
await ctx.send(f"Failed to unload cog `{cog}` {type(e).__name__} - {e}")
|
|
||||||
else:
|
|
||||||
await ctx.send(f"Successfully unloaded cog `{cog}`")
|
|
||||||
else:
|
|
||||||
await ctx.send("I'm afraid I can't let you do that")
|
|
||||||
|
|
||||||
@commands.command(name="reload", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _cog_reload(self, ctx: commands.Context, *, cog: str) -> None:
|
|
||||||
if cog in ["jarvis.cogs.owner", "owner"]:
|
|
||||||
await ctx.send("Cannot reload `owner` cog")
|
|
||||||
return
|
|
||||||
info = await self.bot.application_info()
|
|
||||||
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
||||||
try:
|
|
||||||
if "jarvis.cogs." not in cog:
|
|
||||||
cog = "jarvis.cogs." + cog.split(".")[-1]
|
|
||||||
try:
|
|
||||||
self.bot.load_extension(cog)
|
|
||||||
except commands.errors.ExtensionNotLoaded:
|
|
||||||
pass
|
|
||||||
self.bot.unload_extension(cog)
|
|
||||||
except Exception as e:
|
|
||||||
await ctx.send(f"Failed to reload cog `{cog}` {type(e).__name__} - {e}")
|
|
||||||
else:
|
|
||||||
await ctx.send(f"Successfully reloaded cog `{cog}`")
|
|
||||||
else:
|
|
||||||
await ctx.send("I'm afraid I can't let you do that")
|
|
||||||
|
|
||||||
@commands.group(name="system", hidden=True, pass_context=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _system(self, ctx: commands.Context) -> None:
|
|
||||||
if ctx.invoked_subcommand is None:
|
|
||||||
await ctx.send("Usage: `system <subcommand>`\n" + "Subcommands: `restart`, `update`")
|
|
||||||
|
|
||||||
@_system.command(name="restart", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _restart(self, ctx: commands.Context) -> None:
|
|
||||||
info = await self.bot.application_info()
|
|
||||||
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
||||||
await ctx.send("Restarting core systems...")
|
|
||||||
if isinstance(ctx.channel, discord.channel.DMChannel):
|
|
||||||
jarvis.restart_ctx = {
|
|
||||||
"user": ctx.message.author.id,
|
|
||||||
"channel": ctx.channel.id,
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
jarvis.restart_ctx = {
|
|
||||||
"guild": ctx.message.guild.id,
|
|
||||||
"channel": ctx.channel.id,
|
|
||||||
}
|
|
||||||
await self.bot.close()
|
|
||||||
else:
|
|
||||||
await ctx.send("I'm afraid I can't let you do that")
|
|
||||||
|
|
||||||
@_system.command(name="update", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _update(self, ctx: commands.Context) -> None:
|
|
||||||
info = await self.bot.application_info()
|
|
||||||
if ctx.message.author == info.owner or ctx.message.author.id in self.admins.value:
|
|
||||||
await ctx.send("Updating core systems...")
|
|
||||||
status = update()
|
|
||||||
if status == 0:
|
|
||||||
await ctx.send("Core systems updated. Restarting...")
|
|
||||||
if isinstance(ctx.channel, discord.channel.DMChannel):
|
|
||||||
jarvis.restart_ctx = {
|
|
||||||
"user": ctx.message.author.id,
|
|
||||||
"channel": ctx.channel.id,
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
jarvis.restart_ctx = {
|
|
||||||
"guild": ctx.message.guild.id,
|
|
||||||
"channel": ctx.channel.id,
|
|
||||||
}
|
|
||||||
await self.bot.close()
|
|
||||||
elif status == 1:
|
|
||||||
await ctx.send("Core systems already up to date.")
|
|
||||||
elif status == 2:
|
|
||||||
await ctx.send("Core system update available, but core is dirty.")
|
|
||||||
else:
|
|
||||||
await ctx.send("I'm afraid I can't let you do that")
|
|
||||||
|
|
||||||
@_system.command(name="refresh", hidden=True)
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _refresh(self, ctx: commands.Context) -> None:
|
|
||||||
reload_config()
|
|
||||||
await ctx.send("System refreshed")
|
|
||||||
|
|
||||||
@commands.group(name="admin", hidden=True, pass_context=True)
|
@commands.group(name="admin", hidden=True, pass_context=True)
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def _admin(self, ctx: commands.Context) -> None:
|
async def _admin(self, ctx: commands.Context) -> None:
|
||||||
|
@ -177,69 +45,6 @@ class OwnerCog(commands.Cog):
|
||||||
reload_config()
|
reload_config()
|
||||||
await ctx.send(f"{user.mention} is no longer an admin.")
|
await ctx.send(f"{user.mention} is no longer an admin.")
|
||||||
|
|
||||||
def resolve_variable(self, variable: Any) -> Any:
|
|
||||||
"""Resolve a variable from eval."""
|
|
||||||
if hasattr(variable, "__iter__"):
|
|
||||||
var_length = len(list(variable))
|
|
||||||
if (var_length > 100) and (not isinstance(variable, str)):
|
|
||||||
return f"<a {type(variable).__name__} iterable " + f"with more than 100 values ({var_length})>"
|
|
||||||
elif not var_length:
|
|
||||||
return f"<an empty {type(variable).__name__} iterable>"
|
|
||||||
|
|
||||||
if (not variable) and (not isinstance(variable, bool)):
|
|
||||||
return f"<an empty {type(variable).__name__} object>"
|
|
||||||
return (
|
|
||||||
variable
|
|
||||||
if (len(f"{variable}") <= 1000)
|
|
||||||
else f"<a long {type(variable).__name__} object " + f"with the length of {len(f'{variable}'):,}>"
|
|
||||||
)
|
|
||||||
|
|
||||||
def prepare(self, string: str) -> str:
|
|
||||||
"""Prepare string for eval."""
|
|
||||||
arr = string.strip("```").replace("py\n", "").replace("python\n", "").split("\n")
|
|
||||||
if not arr[::-1][0].replace(" ", "").startswith("return"):
|
|
||||||
arr[len(arr) - 1] = "return " + arr[::-1][0]
|
|
||||||
return "".join(f"\n\t{i}" for i in arr)
|
|
||||||
|
|
||||||
@commands.command(pass_context=True, aliases=["eval", "exec", "evaluate"])
|
|
||||||
@user_is_bot_admin()
|
|
||||||
async def _eval(self, ctx: commands.Context, *, code: str) -> None:
|
|
||||||
if not isinstance(ctx.message.channel, DMChannel):
|
|
||||||
return
|
|
||||||
code = self.prepare(code)
|
|
||||||
args = {
|
|
||||||
"discord": discord,
|
|
||||||
"sauce": getsource,
|
|
||||||
"sys": sys,
|
|
||||||
"os": os,
|
|
||||||
"imp": __import__,
|
|
||||||
"this": self,
|
|
||||||
"ctx": ctx,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
exec( # noqa: S102
|
|
||||||
f"async def func():{code}",
|
|
||||||
globals().update(args),
|
|
||||||
locals(),
|
|
||||||
)
|
|
||||||
a = time()
|
|
||||||
response = await eval("func()", globals().update(args), locals()) # noqa: S307
|
|
||||||
if response is None or isinstance(response, discord.Message):
|
|
||||||
del args, code
|
|
||||||
return
|
|
||||||
|
|
||||||
if isinstance(response, str):
|
|
||||||
response = response.replace("`", "")
|
|
||||||
|
|
||||||
await ctx.send(
|
|
||||||
f"```py\n{self.resolve_variable(response)}```\n`{type(response).__name__} | {(time() - a) / 1000} ms`"
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
await ctx.send(f"Error occurred:```\n{traceback.format_exc()}```")
|
|
||||||
|
|
||||||
del args, code
|
|
||||||
|
|
||||||
|
|
||||||
def setup(bot: commands.Bot) -> None:
|
def setup(bot: commands.Bot) -> None:
|
||||||
"""Add OwnerCog to J.A.R.V.I.S."""
|
"""Add OwnerCog to J.A.R.V.I.S."""
|
||||||
|
|
Loading…
Add table
Reference in a new issue