jarvis-bot/jarvis/cogs/admin.py
2021-07-03 17:17:21 -06:00

525 lines
17 KiB
Python

import jarvis
import pymongo
import re
from datetime import datetime, timedelta
from discord import User, Member
from discord.ext import commands
from discord.utils import get, find
from discord_slash import cog_ext, SlashContext
from discord_slash.utils.manage_commands import create_option, create_choice
from jarvis.utils.db import DBManager
from jarvis.utils.permissions import admin_or_permissions
class AdminCog(commands.Cog):
"""
Guild admin functions
Used to manage guilds
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
config = jarvis.config.get_config()
self.db = DBManager(config.mongo).mongo
@cog_ext.cog_slash(
name="ban",
description="Ban a user",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="user",
description="User to ban",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Ban reason",
required=True,
option_type=3,
),
create_option(
name="type",
description="Ban type",
option_type=3,
required=False,
choices=[
create_choice(value="perm", name="Permanent"),
create_choice(value="temp", name="Temporary"),
create_choice(value="soft", name="Soft"),
],
),
create_option(
name="duration",
description="Ban duration in hours if temporary",
required=False,
option_type=4,
),
],
)
@admin_or_permissions(ban_members=True)
async def _ban(
self,
ctx: SlashContext,
user: User = None,
reason: str = None,
type: str = "perm",
length: int = 4,
):
if not user or user == ctx.author:
await ctx.send("You cannot ban yourself.")
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that")
return
if type == "temp" and length < 0:
await ctx.send("You cannot set a temp ban to < 0 hours.")
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
mtype = type
if mtype == "perm":
mtype = "perma"
guild_name = ctx.guild.name
user_message = (
f"You have been {mtype}banned from {guild_name}."
+ " Reason:\n{reason}"
)
time = datetime.now()
expiry = None
if mtype == "temp":
user_message += f"\nDuration: {length} hours"
expiry = time + timedelta(hours=length)
await user.send(user_message)
await ctx.guild.ban(user, reason=reason)
if mtype == "soft":
await ctx.guild.unban(user, reason="Ban was softban")
await ctx.send(
f"{user.name} has been {mtype}banned from {guild_name}."
+ f"Reason:\n{reason}"
)
if type != "temp":
length = None
active = True
if type == "soft":
active = False
self.db.jarvis.bans.insert_one(
{
"user": user.id,
"username": user.name,
"discrim": user.discriminator,
"reason": reason,
"admin": ctx.author.id,
"time": datetime.now(),
"guild": ctx.guild.id,
"type": type,
"length": length,
"expiry": expiry,
"active": active,
}
)
async def discord_apply_unban(self, ctx, user, reason):
await ctx.guild.unban(user, reason=reason)
user: User = user
self.db.jarvis.unbans.insert_one(
{
"user": user.id,
"username": user.name,
"discrim": user.discriminator,
"guild": ctx.guild.id,
"admin": ctx.author,
"reason": reason,
"time": datetime.now(),
}
)
_ = self.db.jarvis.bans.update(
{"user": user.id, "guild": ctx.guild.id},
{"$set": {"active": False}},
)
await ctx.send("User successfully unbanned.\nReason: " + reason)
@cog_ext.cog_slash(
name="unban",
description="Unban a user",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="id",
description="User ID to unban",
option_type=3,
required=True,
),
create_option(
name="reason",
description="Unban reason",
required=True,
option_type=3,
),
],
)
@admin_or_permissions(ban_members=True)
async def _unban(
self,
ctx: SlashContext,
user: str,
reason: str,
):
ctx.defer()
orig_user = user
discrim = None
discord_ban_info = None
database_ban_info = None
bans = await ctx.guild.bans()
# Try to get ban information out of Discord
if re.match("^[0-9]{1,}$"): # User ID
user = int(user)
discord_ban_info = find(lambda x: x.id == user, bans)
else: # User name
if re.match("#[0-9]{4}$", user): # User name has discrim
user, discrim = user.split("#")
if discrim:
discord_ban_info = find(
lambda x: x.name == user and x.discriminator == discrim,
bans,
)
else:
results = filter(lambda x: x.name == user, bans)
if results:
if len(results) > 1:
# TODO: send list of bans that matched
# for admins to use on next attempt
pass
else:
discord_ban_info = results[0]
# If we don't have the ban information in Discord,
# try to find the relevant information in the database.
# We take advantage of the previous checks to save CPU cycles
if not discord_ban_info:
if isinstance(user, int):
database_ban_info = self.db.jarvis.bans.find_one(
{"guild": ctx.guild.id, "user": user, "active": True}
)
else:
search = {
"guild": ctx.guild.id,
"username": user,
"active": True,
}
if discrim:
search["discrim"] = discrim
database_ban_info = self.db.jarvis.bans.find_one(search)
if not discord_ban_info and not database_ban_info:
await ctx.send(f"Unable to find user {orig_user}")
elif discord_ban_info:
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
else:
discord_ban_info = find(
lambda x: x.id == database_ban_info["id"], bans
)
if discord_ban_info:
await self.discord_apply_unban(
ctx, discord_ban_info.user, reason
)
else:
self.db.jarvis.bans.update_many(
{"user": database_ban_info["id"], "guild": ctx.guild.id},
{"$set": {"active": False}},
)
self.db.jarvis.unbans.insert_one(
{
"user": discord_ban_info.id,
"username": discord_ban_info.name,
"discrim": discord_ban_info.discriminator,
"guild": ctx.guild.id,
"admin": ctx.author,
"reason": reason,
"time": datetime.now(),
}
)
await ctx.send(
"Unable to find user in Discord, "
+ "but removed entry from database."
)
@cog_ext.cog_subcommand(
base="bans",
name="list",
description="List bans",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="type",
description="Ban type",
option_type=4,
required=False,
choices=[
create_choice(value=0, name="All"),
create_choice(value=1, name="Permanent"),
create_choice(value=2, name="Temporary"),
create_choice(value=3, name="Soft"),
],
),
create_option(
name="active",
description="Active bans",
option_type=5,
required=False,
choices=[
create_choice(value=True, name="Yes"),
create_choice(value=False, name="No"),
],
),
],
)
@admin_or_permissions(ban_members=True)
async def _bans_list(
self, ctx: SlashContext, type: int = 0, active: bool = True
):
ctx.defer()
types = [0, "perm", "temp", "soft"]
search = {"guild": ctx.guild.id, "active": active}
if type > 0:
search["type"] = types[type]
bans = self.db.jarvis.bans.find(search).sort(
[("time", pymongo.DESCENDING)]
)
message = "Bans:\n```\n"
db_bans = []
for ban in bans:
if "username" not in ban:
user = await self.bot.fetch_user(ban["user"])
ban["username"] = user.name if user else "[deleted user]"
message += "[{0}] {1} ({2}): {3}\n".format(
ban["time"].strftime("%d-%m-%Y"),
ban["username"],
ban["user"],
ban["reason"],
)
async for ban in ctx.guild.bans():
if ban.user.id not in db_bans:
message += "[unknown] {0} ({1}): {2}".format(
ban.user.name, ban.user.id, ban.reason
)
message += "```"
await ctx.send(message)
@cog_ext.cog_slash(
name="kick",
description="Kick a user",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="user",
description="User to kick",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Kick reason",
required=False,
option_type=3,
),
],
)
@admin_or_permissions(kick_members=True)
async def _kick(self, ctx: SlashContext, user: User, reason=None):
if not user or user == ctx.author:
await ctx.send("You cannot kick yourself.")
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that")
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
guild_name = ctx.guild.name
try:
await user.send(
f"You have been kicked from {guild_name}. Reason:\n{reason}"
)
except Exception:
await ctx.send("Unable to message user.")
await ctx.guild.kick(user, reason=reason)
await ctx.send(
f"{user.name} has been kicked from {guild_name}."
+ f"Reason:\n{reason}"
)
self.db.jarvis.kicks.insert_one(
{
"user": user.id,
"reason": reason,
"admin": ctx.authod.id,
"time": datetime.now(),
"guild": ctx.guild.id,
}
)
@cog_ext.cog_slash(
name="purge",
description="Purge messages from channel",
guild_ids=[578757004059738142],
options=[
create_option(
name="amount",
description="Amount of messages to purge",
required=False,
option_type=4,
)
],
)
@admin_or_permissions(manage_messages=True)
async def _purge(self, ctx: SlashContext, amount: int = 10):
if amount < 1:
await ctx.send("Amount must be >= 1")
return
await ctx.defer()
channel = ctx.channel
messages = []
async for message in channel.history(limit=amount + 1):
messages.append(message)
await channel.delete_messages(messages)
self.db.jarvis.purges.insert_one(
{
"channel": ctx.channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"count": amount,
"time": datetime.now(),
}
)
@cog_ext.cog_slash(
name="mute",
description="Mute a user",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="user",
description="User to mute",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for mute",
option_type=3,
required=True,
),
create_option(
name="length",
description="Mute length",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(mute_members=True)
async def _mute(
self, ctx: SlashContext, user: Member, reason: str, length: int = 30
):
ctx.defer()
if user == ctx.author:
await ctx.send("You cannot mute yourself.")
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that")
return
mute_setting = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "mute"}
)
if not mute_setting:
await ctx.send(
"Please configure a mute role with /settings mute <role> first"
)
return
role = get(ctx.guild.roles, id=mute_setting["value"])
await user.add_roles(role, reason=reason)
time = datetime.now()
expiry = None
if length < 0:
length = -1
if length >= 0:
expiry = time + timedelta(minutes=length)
self.db.jarvis.mutes.insert_one(
{
"user": user.id,
"reason": reason,
"admin": ctx.author.id,
"time": time,
"guild": ctx.guild.id,
"length": length,
"expiry": expiry,
"active": True if length >= 0 else False,
}
)
self.db.jarvis.mutes.update_many(
{
"guild": ctx.guild.id,
"user": user.id,
"expiry": {"$lt": expiry},
},
{"$set": {"active": False}},
)
await ctx.send(f"{user.mention} has been muted.\nReason: {reason}")
@cog_ext.cog_slash(
name="unmute",
description="Unmute a user",
guild_ids=[418094694325813248, 578757004059738142],
options=[
create_option(
name="user",
description="User to unmute",
option_type=6,
required=True,
)
],
)
@admin_or_permissions(mute_members=True)
async def _unmute(self, ctx: SlashContext, user: Member):
ctx.defer()
mute_setting = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "mute"}
)
if not mute_setting:
await ctx.send(
"Please configure a mute role with "
+ "/settings mute <role> first."
)
return
role = get(ctx.guild.roles, id=mute_setting["value"])
if role in user.roles:
await user.remove_roles(role, reason="Unmute")
else:
await ctx.send("User is not muted.")
return
self.db.jarvis.mutes.update_many(
{
"guild": ctx.guild.id,
"user": user.id,
},
{"$set": {"active": False}},
)
await ctx.send(f"{user.mention} has been unmuted.")
def setup(bot):
bot.add_cog(AdminCog(bot))