jarvis-bot/jarvis/cogs/admin.py

1069 lines
34 KiB
Python

import re
from datetime import datetime, timedelta
from typing import Union
import pymongo
from discord import Member, Role, TextChannel, User, VoiceChannel
from discord.ext import commands
from discord.utils import find, get
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_choice, create_option
import jarvis
from jarvis.utils import build_embed
from jarvis.utils.db import DBManager
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class AdminCog(commands.Cog):
"""
Guild admin functions
Used to manage guilds
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
config = jarvis.config.get_config()
self.db = DBManager(config.mongo).mongo
@cog_ext.cog_slash(
name="ban",
description="Ban a user",
options=[
create_option(
name="user",
description="User to ban",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Ban reason",
required=True,
option_type=3,
),
create_option(
name="type",
description="Ban type",
option_type=3,
required=False,
choices=[
create_choice(value="perm", name="Permanent"),
create_choice(value="temp", name="Temporary"),
create_choice(value="soft", name="Soft"),
],
),
create_option(
name="duration",
description="Ban duration in hours if temporary",
required=False,
option_type=4,
),
],
)
@admin_or_permissions(ban_members=True)
async def _ban(
self,
ctx: SlashContext,
user: User = None,
reason: str = None,
type: str = "perm",
length: int = 4,
):
if not user or user == ctx.author:
await ctx.send("You cannot ban yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if type == "temp" and length < 0:
await ctx.send(
"You cannot set a temp ban to < 0 hours.", hidden=True
)
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
mtype = type
if mtype == "perm":
mtype = "perma"
guild_name = ctx.guild.name
user_message = (
f"You have been {mtype}banned from {guild_name}."
+ f" Reason:\n{reason}"
)
time = datetime.now()
expiry = None
if mtype == "temp":
user_message += f"\nDuration: {length} hours"
expiry = time + timedelta(hours=length)
await user.send(user_message)
await ctx.guild.ban(user, reason=reason)
if mtype == "soft":
await ctx.guild.unban(user, reason="Ban was softban")
await ctx.send(
f"{user.name} has been {mtype}banned from {guild_name}."
+ f" Reason:\n{reason}"
)
if type != "temp":
length = None
active = True
if type == "soft":
active = False
self.db.jarvis.bans.insert_one(
{
"user": user.id,
"username": user.name,
"discrim": user.discriminator,
"reason": reason,
"admin": ctx.author.id,
"time": datetime.now(),
"guild": ctx.guild.id,
"type": type,
"length": length,
"expiry": expiry,
"active": active,
}
)
async def discord_apply_unban(
self, ctx: SlashContext, user: User, reason: str
):
await ctx.guild.unban(user, reason=reason)
self.db.jarvis.unbans.insert_one(
{
"user": user.id,
"username": user.name,
"discrim": user.discriminator,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"time": datetime.now(),
}
)
_ = self.db.jarvis.bans.update(
{"user": user.id, "guild": ctx.guild.id},
{"$set": {"active": False}},
)
await ctx.send("User successfully unbanned.\nReason: " + reason)
@cog_ext.cog_slash(
name="unban",
description="Unban a user",
options=[
create_option(
name="user",
description="User to unban",
option_type=3,
required=True,
),
create_option(
name="reason",
description="Unban reason",
required=True,
option_type=3,
),
],
)
@admin_or_permissions(ban_members=True)
async def _unban(
self,
ctx: SlashContext,
user: str,
reason: str,
):
await ctx.defer()
orig_user = user
discrim = None
discord_ban_info = None
database_ban_info = None
bans = await ctx.guild.bans()
# Try to get ban information out of Discord
if re.match("^[0-9]{1,}$", user): # User ID
user = int(user)
discord_ban_info = find(lambda x: x.user.id == user, bans)
else: # User name
if re.match("#[0-9]{4}$", user): # User name has discrim
user, discrim = user.split("#")
if discrim:
discord_ban_info = find(
lambda x: x.user.name == user
and x.user.discriminator == discrim,
bans,
)
else:
results = [
x for x in filter(lambda x: x.user.name == user, bans)
]
if results:
if len(results) > 1:
active_bans = []
for ban in bans:
active_bans.append(
"{0} ({1}): {2}".format(
ban.user.name, ban.user.id, ban.reason
)
)
message = (
"More than one result. "
+ "Please use one of the following IDs:\n```"
+ "\n".join(active_bans)
+ "\n```"
)
await ctx.send(message)
return
else:
discord_ban_info = results[0]
# If we don't have the ban information in Discord,
# try to find the relevant information in the database.
# We take advantage of the previous checks to save CPU cycles
if not discord_ban_info:
if isinstance(user, int):
database_ban_info = self.db.jarvis.bans.find_one(
{"guild": ctx.guild.id, "user": user, "active": True}
)
else:
search = {
"guild": ctx.guild.id,
"username": user,
"active": True,
}
if discrim:
search["discrim"] = discrim
database_ban_info = self.db.jarvis.bans.find_one(search)
if not discord_ban_info and not database_ban_info:
await ctx.send(f"Unable to find user {orig_user}")
elif discord_ban_info:
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
else:
discord_ban_info = find(
lambda x: x.user.id == database_ban_info["id"], bans
)
if discord_ban_info:
await self.discord_apply_unban(
ctx, discord_ban_info.user, reason
)
else:
self.db.jarvis.bans.update_many(
{"user": database_ban_info["id"], "guild": ctx.guild.id},
{"$set": {"active": False}},
)
self.db.jarvis.unbans.insert_one(
{
"user": database_ban_info["user"],
"username": database_ban_info["username"],
"discrim": database_ban_info["discrim"],
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"time": datetime.now(),
}
)
await ctx.send(
"Unable to find user in Discord, "
+ "but removed entry from database."
)
@cog_ext.cog_subcommand(
base="bans",
name="list",
description="List bans",
options=[
create_option(
name="type",
description="Ban type",
option_type=4,
required=False,
choices=[
create_choice(value=0, name="All"),
create_choice(value=1, name="Permanent"),
create_choice(value=2, name="Temporary"),
create_choice(value=3, name="Soft"),
],
),
create_option(
name="active",
description="Active bans",
option_type=4,
required=False,
choices=[
create_choice(value=1, name="Yes"),
create_choice(value=0, name="No"),
],
),
],
)
@admin_or_permissions(ban_members=True)
async def _bans_list(
self, ctx: SlashContext, type: int = 0, active: int = 1
):
active = bool(active)
await ctx.defer()
types = [0, "perm", "temp", "soft"]
search = {"guild": ctx.guild.id}
if active:
search["active"] = True
if type > 0:
search["type"] = types[type]
bans = self.db.jarvis.bans.find(search).sort(
[("time", pymongo.DESCENDING)]
)
ban_messages = []
db_bans = []
for ban in bans:
if "username" not in ban:
user = await self.bot.fetch_user(ban["user"])
ban["username"] = user.name if user else "[deleted user]"
ban_messages.append(
"[{0}] {1} ({2}): {3}".format(
ban["time"].strftime("%d-%m-%Y"),
ban["username"],
ban["user"],
ban["reason"],
)
)
db_bans.append(ban["user"])
bans = await ctx.guild.bans()
for ban in bans:
if ban.user.id not in db_bans:
ban_messages.append(
"[unknown] {0} ({1}): {2}".format(
ban.user.name, ban.user.id, ban.reason
)
)
message = ""
if len(ban_messages) == 0:
message = "No bans matched the criteria."
else:
message = "Active " if active else "Inactive "
message += "Bans:\n```\n" + "\n".join(ban_messages) + "\n```"
await ctx.send(message)
@cog_ext.cog_slash(
name="kick",
description="Kick a user",
options=[
create_option(
name="user",
description="User to kick",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Kick reason",
required=False,
option_type=3,
),
],
)
@admin_or_permissions(kick_members=True)
async def _kick(self, ctx: SlashContext, user: User, reason=None):
if not user or user == ctx.author:
await ctx.send("You cannot kick yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
guild_name = ctx.guild.name
try:
await user.send(
f"You have been kicked from {guild_name}. Reason:\n{reason}"
)
except Exception:
await ctx.send("Unable to message user.")
await ctx.guild.kick(user, reason=reason)
await ctx.send(
f"{user.name} has been kicked from {guild_name}."
+ f"Reason:\n{reason}"
)
self.db.jarvis.kicks.insert_one(
{
"user": user.id,
"reason": reason,
"admin": ctx.authod.id,
"time": datetime.now(),
"guild": ctx.guild.id,
}
)
@cog_ext.cog_slash(
name="purge",
description="Purge messages from channel",
options=[
create_option(
name="amount",
description="Amount of messages to purge",
required=False,
option_type=4,
)
],
)
@admin_or_permissions(manage_messages=True)
async def _purge(self, ctx: SlashContext, amount: int = 10):
if amount < 1:
await ctx.send("Amount must be >= 1", hidden=True)
return
await ctx.defer()
channel = ctx.channel
messages = []
async for message in channel.history(limit=amount + 1):
messages.append(message)
await channel.delete_messages(messages)
self.db.jarvis.purges.insert_one(
{
"channel": ctx.channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"count": amount,
"time": datetime.now(),
}
)
@cog_ext.cog_slash(
name="mute",
description="Mute a user",
options=[
create_option(
name="user",
description="User to mute",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for mute",
option_type=3,
required=True,
),
create_option(
name="length",
description="Mute length",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(mute_members=True)
async def _mute(
self, ctx: SlashContext, user: Member, reason: str, length: int = 30
):
await ctx.defer()
if user == ctx.author:
await ctx.send("You cannot mute yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
mute_setting = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "mute"}
)
if not mute_setting:
await ctx.send(
"Please configure a mute role with /settings mute <role> first",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting["value"])
await user.add_roles(role, reason=reason)
time = datetime.now()
expiry = None
if length < 0:
length = -1
if length >= 0:
expiry = time + timedelta(minutes=length)
self.db.jarvis.mutes.insert_one(
{
"user": user.id,
"reason": reason,
"admin": ctx.author.id,
"time": time,
"guild": ctx.guild.id,
"length": length,
"expiry": expiry,
"active": True if length >= 0 else False,
}
)
self.db.jarvis.mutes.update_many(
{
"guild": ctx.guild.id,
"user": user.id,
"expiry": {"$lt": expiry},
},
{"$set": {"active": False}},
)
await ctx.send(f"{user.mention} has been muted.\nReason: {reason}")
@cog_ext.cog_slash(
name="unmute",
description="Unmute a user",
options=[
create_option(
name="user",
description="User to unmute",
option_type=6,
required=True,
)
],
)
@admin_or_permissions(mute_members=True)
async def _unmute(self, ctx: SlashContext, user: Member):
await ctx.defer()
mute_setting = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "mute"}
)
if not mute_setting:
await ctx.send(
"Please configure a mute role with "
+ "/settings mute <role> first.",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting["value"])
if role in user.roles:
await user.remove_roles(role, reason="Unmute")
else:
await ctx.send("User is not muted.", hidden=True)
return
self.db.jarvis.mutes.update_many(
{
"guild": ctx.guild.id,
"user": user.id,
},
{"$set": {"active": False}},
)
await ctx.send(f"{user.mention} has been unmuted.")
async def _lock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
reason: str,
allow_send=False,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = allow_send
elif isinstance(channel, VoiceChannel):
overrides.speak = allow_send
await channel.set_permissions(role, overwrite=overrides, reason=reason)
async def _unlock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = None
elif isinstance(channel, VoiceChannel):
overrides.speak = None
await channel.set_permissions(role, overwrite=overrides)
@cog_ext.cog_slash(
name="lock",
description="Locks a channel",
options=[
create_option(
name="reason",
description="Lock Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lock duration in minutes (default 10)",
option_type=4,
required=False,
),
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lock(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
channel: Union[TextChannel, VoiceChannel] = None,
):
await ctx.defer()
if not channel:
channel = ctx.channel
for role in ctx.guild.roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
self.db.jarvis.locks.insert_one(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"duration": duration,
"active": True,
"time": datetime.now(),
}
)
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
@cog_ext.cog_slash(
name="unlock",
description="Unlocks a channel",
options=[
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _unlock(
self,
ctx: SlashContext,
channel: Union[TextChannel, VoiceChannel] = None,
):
await ctx.defer()
if not channel:
channel = ctx.channel
lock = self.db.jarvis.locks.find_one(
{"guild": ctx.guild.id, "channel": channel.id, "active": True}
)
if not lock:
await ctx.send(f"{channel.mention} not locked.", hidden=True)
return
for role in ctx.guild.roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
self.db.jarvis.locks.update_one(
{
"channel": channel.id,
"guild": ctx.guild.id,
},
{"$set": {"active": False}},
)
await ctx.send(f"{channel.mention} unlocked")
@cog_ext.cog_subcommand(
base="lockdown",
name="start",
description="Locks a server",
options=[
create_option(
name="reason",
description="Lockdown Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lockdown duration in minutes (default 10)",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lockdown_start(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
):
await ctx.defer()
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
for channel in channels:
for role in roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
updates.append(
pymongo.InsertOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"duration": duration,
"active": True,
"time": datetime.now(),
}
)
)
if updates:
self.db.jarvis.locks.bulk_write(updates)
await ctx.send(f"Server locked for {duration} minute(s)")
@cog_ext.cog_subcommand(
base="lockdown",
name="end",
description="Unlocks a server",
)
@commands.has_permissions(administrator=True)
async def _lockdown_end(
self,
ctx: SlashContext,
):
await ctx.defer()
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
locks = list(
self.db.jarvis.locks.find({"guild": ctx.guild.id, "active": True})
)
if not locks:
await ctx.send("No lockdown detected.", hidden=True)
return
for channel in channels:
for role in roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
updates.append(
pymongo.UpdateOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
},
{"$set": {"active": False}},
)
)
if updates:
self.db.jarvis.locks.bulk_write(updates)
await ctx.send("Server unlocked")
@cog_ext.cog_slash(
name="warn",
description="Warn a user",
options=[
create_option(
name="user",
description="User to warn",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for warning",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Duration of warning in hours, default 24",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _warn(
self, ctx: SlashContext, user: User, reason: str, duration: int = 24
):
await ctx.defer()
self.db.jarvis.warns.insert_one(
{
"user": user.id,
"reason": reason,
"admin": ctx.author.id,
"time": datetime.now(),
"guild": ctx.guild.id,
"duration": duration,
"active": True,
}
)
count = len(
list(
self.db.jarvis.warns.find(
{"user": user.id, "guild": ctx.guild.id, "active": True}
)
)
)
fields = [Field("Reason", reason, False)]
embed = build_embed(
title="Warning",
description=f"{user.mention} has been warned",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="warnings",
description="Get count of user warnings",
options=[
create_option(
name="user",
description="User to view",
option_type=6,
required=True,
),
],
)
@commands.has_permissions(administrator=True)
async def _warnings(self, ctx: SlashContext, user: User):
await ctx.defer()
warnings = list(
self.db.jarvis.warns.find(
{
"user": user.id,
"guild": ctx.guild.id,
}
)
)
active = len(list(filter(lambda x: x["active"], warnings)))
total = len(warnings)
fields = [Field("Active", active), Field("Total", total)]
embed = build_embed(
title="Warnings",
description=f"{user.mention} active and total warnings",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_subcommand(
base="roleping",
name="block",
description="Add a role to the roleping blocklist",
options=[
create_option(
name="role",
description="Role to add to blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_block(self, ctx: SlashContext, role: Role):
roles = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "roleping"}
)
if not roles:
roles = {"guild": ctx.guild.id, "setting": "roleping", "value": []}
if role.id in roles["value"]:
await ctx.send(
f"Role `{role.name}` already in blocklist.", hidden=True
)
return
roles["value"].append(role.id)
self.db.jarvis.settings.update_one(
{"guild": ctx.guild.id, "setting": "roleping"},
{"$set": roles},
upsert=True,
)
await ctx.send(f"Role `{role.name}` added to blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="allow",
description="Remove a role from the roleping blocklist",
options=[
create_option(
name="role",
description="Role to remove from blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_allow(self, ctx: SlashContext, role: Role):
roles = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "roleping"}
)
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
if role.id not in roles["value"]:
await ctx.send(
f"Role `{role.name}` not in blocklist.", hidden=True
)
return
roles["value"].delete(role.id)
self.db.jarvis.settings.update_one(
{"guild": ctx.guild.id, "setting": "roleping"},
{"$set": roles},
upsert=True,
)
await ctx.send(f"Role `{role.name}` removed blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="list",
description="List all blocklisted roles",
)
async def _roleping_list(self, ctx: SlashContext):
roles = self.db.jarvis.settings.find_one(
{"guild": ctx.guild.id, "setting": "roleping"}
)
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
message = "Blocklisted Roles:\n```\n"
if not roles["value"]:
await ctx.send("No roles blocklisted.", hidden=True)
return
for role in roles["value"]:
role = ctx.guild.get_role(role)
if not role:
continue
message += role.name + "\n"
message += "```"
await ctx.send(message)
@cog_ext.cog_subcommand(
base="autopurge",
name="add",
description="Automatically purge messages after x seconds",
options=[
create_option(
name="channel",
description="Channel to autopurge",
option_type=7,
required=True,
),
create_option(
name="delay",
description="Seconds to keep message before purge, default 30",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_add(
self, ctx: SlashContext, channel: TextChannel, delay: int = 30
):
autopurge = self.db.jarvis.autopurge.find(
{"guild": ctx.guild.id, "channel": channel.id}
)
if autopurge:
await ctx.send("Autopurge already exists.", hidden=True)
return
autopurge = {
"guild": ctx.guild.id,
"channel": channel.id,
"admin": ctx.author.id,
"delay": delay,
"time": datetime.utcnow(),
}
self.db.jarvis.autopurge.insert_one(autopurge)
await ctx.send(
f"Autopurge set up on {channel.mention}, "
+ f"delay is {delay} seconds"
)
@cog_ext.cog_subcommand(
base="autopurge",
name="remove",
description="Remove an autopurge",
options=[
create_option(
name="channel",
description="Channel to remove from autopurge",
option_type=7,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel):
autopurge = self.db.jarvis.autopurge.find(
{"guild": ctx.guild.id, "channel": channel.id}
)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
self.db.jarvis.autopurge.delete_one({"_id": autopurge["_id"]})
await ctx.send(f"Autopurge removed from {channel.mention}.")
@cog_ext.cog_subcommand(
base="autopurge",
name="update",
description="Update autopurge on a channel",
options=[
create_option(
name="channel",
description="Channel to update",
option_type=7,
required=True,
),
create_option(
name="delay",
description="New time to save",
option_type=4,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_update(
self, ctx: SlashContext, channel: TextChannel, delay: int
):
autopurge = self.db.jarvis.autopurge.find(
{"guild": ctx.guild.id, "channel": channel.id}
)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
self.db.jarvis.autopurge.update_one(
{"_id": autopurge["_id"]}, {"$set": {"delay": delay}}
)
await ctx.send(
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
)
def setup(bot):
bot.add_cog(AdminCog(bot))