1093 lines
35 KiB
Python
1093 lines
35 KiB
Python
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Union
|
|
|
|
import pymongo
|
|
from discord import Member, Role, TextChannel, User, VoiceChannel
|
|
from discord.ext import commands
|
|
from discord.utils import find, get
|
|
from discord_slash import SlashContext, cog_ext
|
|
from discord_slash.utils.manage_commands import create_choice, create_option
|
|
|
|
import jarvis
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.db import DBManager
|
|
from jarvis.utils.field import Field
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class AdminCog(commands.Cog):
|
|
"""
|
|
Guild admin functions
|
|
|
|
Used to manage guilds
|
|
"""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
config = jarvis.config.get_config()
|
|
self.db = DBManager(config.mongo).mongo
|
|
|
|
@cog_ext.cog_slash(
|
|
name="ban",
|
|
description="Ban a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to ban",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Ban reason",
|
|
required=True,
|
|
option_type=3,
|
|
),
|
|
create_option(
|
|
name="type",
|
|
description="Ban type",
|
|
option_type=3,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value="perm", name="Permanent"),
|
|
create_choice(value="temp", name="Temporary"),
|
|
create_choice(value="soft", name="Soft"),
|
|
],
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Ban duration in hours if temporary",
|
|
required=False,
|
|
option_type=4,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _ban(
|
|
self,
|
|
ctx: SlashContext,
|
|
user: User = None,
|
|
reason: str = None,
|
|
type: str = "perm",
|
|
length: int = 4,
|
|
):
|
|
if not user or user == ctx.author:
|
|
await ctx.send("You cannot ban yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if type == "temp" and length < 0:
|
|
await ctx.send(
|
|
"You cannot set a temp ban to < 0 hours.", hidden=True
|
|
)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not reason:
|
|
reason = (
|
|
"Mr. Stark is displeased with your presence. Please leave."
|
|
)
|
|
|
|
mtype = type
|
|
if mtype == "perm":
|
|
mtype = "perma"
|
|
|
|
guild_name = ctx.guild.name
|
|
user_message = (
|
|
f"You have been {mtype}banned from {guild_name}."
|
|
+ f" Reason:\n{reason}"
|
|
)
|
|
time = datetime.now()
|
|
expiry = None
|
|
if mtype == "temp":
|
|
user_message += f"\nDuration: {length} hours"
|
|
expiry = time + timedelta(hours=length)
|
|
|
|
await user.send(user_message)
|
|
await ctx.guild.ban(user, reason=reason)
|
|
if mtype == "soft":
|
|
await ctx.guild.unban(user, reason="Ban was softban")
|
|
await ctx.send(
|
|
f"{user.name} has been {mtype}banned from {guild_name}."
|
|
+ f" Reason:\n{reason}"
|
|
)
|
|
if type != "temp":
|
|
length = None
|
|
active = True
|
|
if type == "soft":
|
|
active = False
|
|
|
|
self.db.jarvis.bans.insert_one(
|
|
{
|
|
"user": user.id,
|
|
"username": user.name,
|
|
"discrim": user.discriminator,
|
|
"reason": reason,
|
|
"admin": ctx.author.id,
|
|
"time": datetime.now(),
|
|
"guild": ctx.guild.id,
|
|
"type": type,
|
|
"length": length,
|
|
"expiry": expiry,
|
|
"active": active,
|
|
}
|
|
)
|
|
|
|
async def discord_apply_unban(
|
|
self, ctx: SlashContext, user: User, reason: str
|
|
):
|
|
await ctx.guild.unban(user, reason=reason)
|
|
self.db.jarvis.unbans.insert_one(
|
|
{
|
|
"user": user.id,
|
|
"username": user.name,
|
|
"discrim": user.discriminator,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"reason": reason,
|
|
"time": datetime.now(),
|
|
}
|
|
)
|
|
_ = self.db.jarvis.bans.update(
|
|
{"user": user.id, "guild": ctx.guild.id},
|
|
{"$set": {"active": False}},
|
|
)
|
|
await ctx.send("User successfully unbanned.\nReason: " + reason)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unban",
|
|
description="Unban a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to unban",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Unban reason",
|
|
required=True,
|
|
option_type=3,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _unban(
|
|
self,
|
|
ctx: SlashContext,
|
|
user: str,
|
|
reason: str,
|
|
):
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
|
|
orig_user = user
|
|
discrim = None
|
|
discord_ban_info = None
|
|
database_ban_info = None
|
|
|
|
bans = await ctx.guild.bans()
|
|
|
|
# Try to get ban information out of Discord
|
|
if re.match("^[0-9]{1,}$", user): # User ID
|
|
user = int(user)
|
|
discord_ban_info = find(lambda x: x.user.id == user, bans)
|
|
else: # User name
|
|
if re.match("#[0-9]{4}$", user): # User name has discrim
|
|
user, discrim = user.split("#")
|
|
if discrim:
|
|
discord_ban_info = find(
|
|
lambda x: x.user.name == user
|
|
and x.user.discriminator == discrim,
|
|
bans,
|
|
)
|
|
else:
|
|
results = [
|
|
x for x in filter(lambda x: x.user.name == user, bans)
|
|
]
|
|
if results:
|
|
if len(results) > 1:
|
|
active_bans = []
|
|
for ban in bans:
|
|
active_bans.append(
|
|
"{0} ({1}): {2}".format(
|
|
ban.user.name, ban.user.id, ban.reason
|
|
)
|
|
)
|
|
message = (
|
|
"More than one result. "
|
|
+ "Please use one of the following IDs:\n```"
|
|
+ "\n".join(active_bans)
|
|
+ "\n```"
|
|
)
|
|
await ctx.send(message)
|
|
return
|
|
else:
|
|
discord_ban_info = results[0]
|
|
|
|
# If we don't have the ban information in Discord,
|
|
# try to find the relevant information in the database.
|
|
# We take advantage of the previous checks to save CPU cycles
|
|
if not discord_ban_info:
|
|
if isinstance(user, int):
|
|
database_ban_info = self.db.jarvis.bans.find_one(
|
|
{"guild": ctx.guild.id, "user": user, "active": True}
|
|
)
|
|
else:
|
|
search = {
|
|
"guild": ctx.guild.id,
|
|
"username": user,
|
|
"active": True,
|
|
}
|
|
if discrim:
|
|
search["discrim"] = discrim
|
|
database_ban_info = self.db.jarvis.bans.find_one(search)
|
|
|
|
if not discord_ban_info and not database_ban_info:
|
|
await ctx.send(f"Unable to find user {orig_user}", hidden=True)
|
|
|
|
elif discord_ban_info:
|
|
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
|
|
else:
|
|
discord_ban_info = find(
|
|
lambda x: x.user.id == database_ban_info["id"], bans
|
|
)
|
|
if discord_ban_info:
|
|
await self.discord_apply_unban(
|
|
ctx, discord_ban_info.user, reason
|
|
)
|
|
else:
|
|
self.db.jarvis.bans.update_many(
|
|
{"user": database_ban_info["id"], "guild": ctx.guild.id},
|
|
{"$set": {"active": False}},
|
|
)
|
|
self.db.jarvis.unbans.insert_one(
|
|
{
|
|
"user": database_ban_info["user"],
|
|
"username": database_ban_info["username"],
|
|
"discrim": database_ban_info["discrim"],
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"reason": reason,
|
|
"time": datetime.now(),
|
|
}
|
|
)
|
|
await ctx.send(
|
|
"Unable to find user in Discord, "
|
|
+ "but removed entry from database."
|
|
)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="bans",
|
|
name="list",
|
|
description="List bans",
|
|
options=[
|
|
create_option(
|
|
name="type",
|
|
description="Ban type",
|
|
option_type=4,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value=0, name="All"),
|
|
create_choice(value=1, name="Permanent"),
|
|
create_choice(value=2, name="Temporary"),
|
|
create_choice(value=3, name="Soft"),
|
|
],
|
|
),
|
|
create_option(
|
|
name="active",
|
|
description="Active bans",
|
|
option_type=4,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value=1, name="Yes"),
|
|
create_choice(value=0, name="No"),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _bans_list(
|
|
self, ctx: SlashContext, type: int = 0, active: int = 1
|
|
):
|
|
active = bool(active)
|
|
types = [0, "perm", "temp", "soft"]
|
|
search = {"guild": ctx.guild.id}
|
|
if active:
|
|
search["active"] = True
|
|
if type > 0:
|
|
search["type"] = types[type]
|
|
bans = self.db.jarvis.bans.find(search).sort(
|
|
[("time", pymongo.DESCENDING)]
|
|
)
|
|
ban_messages = []
|
|
db_bans = []
|
|
for ban in bans:
|
|
if "username" not in ban:
|
|
user = await self.bot.fetch_user(ban["user"])
|
|
ban["username"] = user.name if user else "[deleted user]"
|
|
ban_messages.append(
|
|
"[{0}] {1} ({2}): {3}".format(
|
|
ban["time"].strftime("%d-%m-%Y"),
|
|
ban["username"],
|
|
ban["user"],
|
|
ban["reason"],
|
|
)
|
|
)
|
|
db_bans.append(ban["user"])
|
|
bans = await ctx.guild.bans()
|
|
for ban in bans:
|
|
if ban.user.id not in db_bans:
|
|
ban_messages.append(
|
|
"[unknown] {0} ({1}): {2}".format(
|
|
ban.user.name, ban.user.id, ban.reason
|
|
)
|
|
)
|
|
message = ""
|
|
if len(ban_messages) == 0:
|
|
message = "No bans matched the criteria."
|
|
else:
|
|
message = "Active " if active else "Inactive "
|
|
message += "Bans:\n```\n" + "\n".join(ban_messages) + "\n```"
|
|
await ctx.send(message)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="kick",
|
|
description="Kick a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to kick",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Kick reason",
|
|
required=False,
|
|
option_type=3,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(kick_members=True)
|
|
async def _kick(self, ctx: SlashContext, user: User, reason=None):
|
|
if not user or user == ctx.author:
|
|
await ctx.send("You cannot kick yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not reason:
|
|
reason = (
|
|
"Mr. Stark is displeased with your presence. Please leave."
|
|
)
|
|
guild_name = ctx.guild.name
|
|
try:
|
|
await user.send(
|
|
f"You have been kicked from {guild_name}. Reason:\n{reason}"
|
|
)
|
|
except Exception:
|
|
await ctx.send("Unable to message user.")
|
|
await ctx.guild.kick(user, reason=reason)
|
|
await ctx.send(
|
|
f"{user.name} has been kicked from {guild_name}."
|
|
+ f"Reason:\n{reason}"
|
|
)
|
|
self.db.jarvis.kicks.insert_one(
|
|
{
|
|
"user": user.id,
|
|
"reason": reason,
|
|
"admin": ctx.authod.id,
|
|
"time": datetime.now(),
|
|
"guild": ctx.guild.id,
|
|
}
|
|
)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="purge",
|
|
description="Purge messages from channel",
|
|
options=[
|
|
create_option(
|
|
name="amount",
|
|
description="Amount of messages to purge",
|
|
required=False,
|
|
option_type=4,
|
|
)
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _purge(self, ctx: SlashContext, amount: int = 10):
|
|
if amount < 1:
|
|
await ctx.send("Amount must be >= 1", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
channel = ctx.channel
|
|
messages = []
|
|
async for message in channel.history(limit=amount + 1):
|
|
messages.append(message)
|
|
await channel.delete_messages(messages)
|
|
self.db.jarvis.purges.insert_one(
|
|
{
|
|
"channel": ctx.channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"count": amount,
|
|
"time": datetime.now(),
|
|
}
|
|
)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="mute",
|
|
description="Mute a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to mute",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Reason for mute",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="length",
|
|
description="Mute length",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(mute_members=True)
|
|
async def _mute(
|
|
self, ctx: SlashContext, user: Member, reason: str, length: int = 30
|
|
):
|
|
if user == ctx.author:
|
|
await ctx.send("You cannot mute yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
mute_setting = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "mute"}
|
|
)
|
|
if not mute_setting:
|
|
await ctx.send(
|
|
"Please configure a mute role with /settings mute <role> first",
|
|
hidden=True,
|
|
)
|
|
return
|
|
role = get(ctx.guild.roles, id=mute_setting["value"])
|
|
await user.add_roles(role, reason=reason)
|
|
time = datetime.now()
|
|
expiry = None
|
|
if length < 0:
|
|
length = -1
|
|
if length >= 0:
|
|
expiry = time + timedelta(minutes=length)
|
|
self.db.jarvis.mutes.insert_one(
|
|
{
|
|
"user": user.id,
|
|
"reason": reason,
|
|
"admin": ctx.author.id,
|
|
"time": time,
|
|
"guild": ctx.guild.id,
|
|
"length": length,
|
|
"expiry": expiry,
|
|
"active": True if length >= 0 else False,
|
|
}
|
|
)
|
|
self.db.jarvis.mutes.update_many(
|
|
{
|
|
"guild": ctx.guild.id,
|
|
"user": user.id,
|
|
"expiry": {"$lt": expiry},
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
await ctx.send(f"{user.mention} has been muted.\nReason: {reason}")
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unmute",
|
|
description="Unmute a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to unmute",
|
|
option_type=6,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@admin_or_permissions(mute_members=True)
|
|
async def _unmute(self, ctx: SlashContext, user: Member):
|
|
mute_setting = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "mute"}
|
|
)
|
|
if not mute_setting:
|
|
await ctx.send(
|
|
"Please configure a mute role with "
|
|
+ "/settings mute <role> first.",
|
|
hidden=True,
|
|
)
|
|
return
|
|
|
|
role = get(ctx.guild.roles, id=mute_setting["value"])
|
|
if role in user.roles:
|
|
await user.remove_roles(role, reason="Unmute")
|
|
else:
|
|
await ctx.send("User is not muted.", hidden=True)
|
|
return
|
|
|
|
self.db.jarvis.mutes.update_many(
|
|
{
|
|
"guild": ctx.guild.id,
|
|
"user": user.id,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
await ctx.send(f"{user.mention} has been unmuted.")
|
|
|
|
async def _lock_channel(
|
|
self,
|
|
channel: Union[TextChannel, VoiceChannel],
|
|
role: Role,
|
|
admin: User,
|
|
reason: str,
|
|
allow_send=False,
|
|
):
|
|
overrides = channel.overwrites_for(role)
|
|
if isinstance(channel, TextChannel):
|
|
overrides.send_messages = allow_send
|
|
elif isinstance(channel, VoiceChannel):
|
|
overrides.speak = allow_send
|
|
await channel.set_permissions(role, overwrite=overrides, reason=reason)
|
|
|
|
async def _unlock_channel(
|
|
self,
|
|
channel: Union[TextChannel, VoiceChannel],
|
|
role: Role,
|
|
admin: User,
|
|
):
|
|
overrides = channel.overwrites_for(role)
|
|
if isinstance(channel, TextChannel):
|
|
overrides.send_messages = None
|
|
elif isinstance(channel, VoiceChannel):
|
|
overrides.speak = None
|
|
await channel.set_permissions(role, overwrite=overrides)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="lock",
|
|
description="Locks a channel",
|
|
options=[
|
|
create_option(
|
|
name="reason",
|
|
description="Lock Reason",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Lock duration in minutes (default 10)",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to lock",
|
|
option_type=7,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lock(
|
|
self,
|
|
ctx: SlashContext,
|
|
reason: str,
|
|
duration: int = 10,
|
|
channel: Union[TextChannel, VoiceChannel] = None,
|
|
):
|
|
await ctx.defer(hidden=True)
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not channel:
|
|
channel = ctx.channel
|
|
for role in ctx.guild.roles:
|
|
try:
|
|
await self._lock_channel(channel, role, ctx.author, reason)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
self.db.jarvis.locks.insert_one(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"reason": reason,
|
|
"duration": duration,
|
|
"active": True,
|
|
"time": datetime.now(),
|
|
}
|
|
)
|
|
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unlock",
|
|
description="Unlocks a channel",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to lock",
|
|
option_type=7,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _unlock(
|
|
self,
|
|
ctx: SlashContext,
|
|
channel: Union[TextChannel, VoiceChannel] = None,
|
|
):
|
|
if not channel:
|
|
channel = ctx.channel
|
|
lock = self.db.jarvis.locks.find_one(
|
|
{"guild": ctx.guild.id, "channel": channel.id, "active": True}
|
|
)
|
|
if not lock:
|
|
await ctx.send(f"{channel.mention} not locked.", hidden=True)
|
|
return
|
|
for role in ctx.guild.roles:
|
|
try:
|
|
await self._unlock_channel(channel, role, ctx.author)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
self.db.jarvis.locks.update_one(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
await ctx.send(f"{channel.mention} unlocked")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="lockdown",
|
|
name="start",
|
|
description="Locks a server",
|
|
options=[
|
|
create_option(
|
|
name="reason",
|
|
description="Lockdown Reason",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Lockdown duration in minutes (default 10)",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lockdown_start(
|
|
self,
|
|
ctx: SlashContext,
|
|
reason: str,
|
|
duration: int = 10,
|
|
):
|
|
await ctx.defer()
|
|
channels = ctx.guild.channels
|
|
roles = ctx.guild.roles
|
|
updates = []
|
|
for channel in channels:
|
|
for role in roles:
|
|
try:
|
|
await self._lock_channel(channel, role, ctx.author, reason)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
updates.append(
|
|
pymongo.InsertOne(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"reason": reason,
|
|
"duration": duration,
|
|
"active": True,
|
|
"time": datetime.now(),
|
|
}
|
|
)
|
|
)
|
|
if updates:
|
|
self.db.jarvis.locks.bulk_write(updates)
|
|
await ctx.send(f"Server locked for {duration} minute(s)")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="lockdown",
|
|
name="end",
|
|
description="Unlocks a server",
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lockdown_end(
|
|
self,
|
|
ctx: SlashContext,
|
|
):
|
|
channels = ctx.guild.channels
|
|
roles = ctx.guild.roles
|
|
updates = []
|
|
locks = list(
|
|
self.db.jarvis.locks.find({"guild": ctx.guild.id, "active": True})
|
|
)
|
|
if not locks:
|
|
await ctx.send("No lockdown detected.", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
for channel in channels:
|
|
for role in roles:
|
|
try:
|
|
await self._unlock_channel(channel, role, ctx.author)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
)
|
|
if updates:
|
|
self.db.jarvis.locks.bulk_write(updates)
|
|
await ctx.send("Server unlocked")
|
|
|
|
@cog_ext.cog_slash(
|
|
name="warn",
|
|
description="Warn a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to warn",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Reason for warning",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Duration of warning in hours, default 24",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _warn(
|
|
self, ctx: SlashContext, user: User, reason: str, duration: int = 24
|
|
):
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
self.db.jarvis.warns.insert_one(
|
|
{
|
|
"user": user.id,
|
|
"reason": reason,
|
|
"admin": ctx.author.id,
|
|
"time": datetime.now(),
|
|
"guild": ctx.guild.id,
|
|
"duration": duration,
|
|
"active": True,
|
|
}
|
|
)
|
|
count = len(
|
|
list(
|
|
self.db.jarvis.warns.find(
|
|
{"user": user.id, "guild": ctx.guild.id, "active": True}
|
|
)
|
|
)
|
|
)
|
|
fields = [Field("Reason", reason, False)]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{user.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="warnings",
|
|
description="Get count of user warnings",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to view",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _warnings(self, ctx: SlashContext, user: User):
|
|
await ctx.defer()
|
|
warnings = list(
|
|
self.db.jarvis.warns.find(
|
|
{
|
|
"user": user.id,
|
|
"guild": ctx.guild.id,
|
|
}
|
|
)
|
|
)
|
|
active = (
|
|
[
|
|
y["reason"]
|
|
for y in list(filter(lambda x: x["active"], warnings))
|
|
]
|
|
if warnings
|
|
else ["None"]
|
|
)
|
|
|
|
total = len(warnings)
|
|
fields = [
|
|
Field(f"{len(active)} Active", "\n".join(active)),
|
|
Field("Total", total),
|
|
]
|
|
embed = build_embed(
|
|
title="Warnings",
|
|
description=f"{user.mention} active and total warnings",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="block",
|
|
description="Add a role to the roleping blocklist",
|
|
options=[
|
|
create_option(
|
|
name="role",
|
|
description="Role to add to blocklist",
|
|
option_type=8,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _roleping_block(self, ctx: SlashContext, role: Role):
|
|
roles = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "roleping"}
|
|
)
|
|
if not roles:
|
|
roles = {"guild": ctx.guild.id, "setting": "roleping", "value": []}
|
|
|
|
if role.id in roles["value"]:
|
|
await ctx.send(
|
|
f"Role `{role.name}` already in blocklist.", hidden=True
|
|
)
|
|
return
|
|
roles["value"].append(role.id)
|
|
self.db.jarvis.settings.update_one(
|
|
{"guild": ctx.guild.id, "setting": "roleping"},
|
|
{"$set": roles},
|
|
upsert=True,
|
|
)
|
|
await ctx.send(f"Role `{role.name}` added to blocklist.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="allow",
|
|
description="Remove a role from the roleping blocklist",
|
|
options=[
|
|
create_option(
|
|
name="role",
|
|
description="Role to remove from blocklist",
|
|
option_type=8,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _roleping_allow(self, ctx: SlashContext, role: Role):
|
|
roles = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "roleping"}
|
|
)
|
|
if not roles:
|
|
await ctx.send("No blocklist configured.", hidden=True)
|
|
return
|
|
|
|
if role.id not in roles["value"]:
|
|
await ctx.send(
|
|
f"Role `{role.name}` not in blocklist.", hidden=True
|
|
)
|
|
return
|
|
roles["value"].delete(role.id)
|
|
self.db.jarvis.settings.update_one(
|
|
{"guild": ctx.guild.id, "setting": "roleping"},
|
|
{"$set": roles},
|
|
upsert=True,
|
|
)
|
|
await ctx.send(f"Role `{role.name}` removed blocklist.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="list",
|
|
description="List all blocklisted roles",
|
|
)
|
|
async def _roleping_list(self, ctx: SlashContext):
|
|
roles = self.db.jarvis.settings.find_one(
|
|
{"guild": ctx.guild.id, "setting": "roleping"}
|
|
)
|
|
if not roles:
|
|
await ctx.send("No blocklist configured.", hidden=True)
|
|
return
|
|
|
|
message = "Blocklisted Roles:\n```\n"
|
|
if not roles["value"]:
|
|
await ctx.send("No roles blocklisted.", hidden=True)
|
|
return
|
|
for role in roles["value"]:
|
|
role = ctx.guild.get_role(role)
|
|
if not role:
|
|
continue
|
|
message += role.name + "\n"
|
|
message += "```"
|
|
await ctx.send(message)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="add",
|
|
description="Automatically purge messages after x seconds",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to autopurge",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="delay",
|
|
description="Seconds to keep message before purge, default 30",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_add(
|
|
self, ctx: SlashContext, channel: TextChannel, delay: int = 30
|
|
):
|
|
autopurge = self.db.jarvis.autopurge.find(
|
|
{"guild": ctx.guild.id, "channel": channel.id}
|
|
)
|
|
if autopurge:
|
|
await ctx.send("Autopurge already exists.", hidden=True)
|
|
return
|
|
autopurge = {
|
|
"guild": ctx.guild.id,
|
|
"channel": channel.id,
|
|
"admin": ctx.author.id,
|
|
"delay": delay,
|
|
"time": datetime.utcnow(),
|
|
}
|
|
self.db.jarvis.autopurge.insert_one(autopurge)
|
|
await ctx.send(
|
|
f"Autopurge set up on {channel.mention}, "
|
|
+ f"delay is {delay} seconds"
|
|
)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="remove",
|
|
description="Remove an autopurge",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to remove from autopurge",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel):
|
|
autopurge = self.db.jarvis.autopurge.find(
|
|
{"guild": ctx.guild.id, "channel": channel.id}
|
|
)
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", hidden=True)
|
|
return
|
|
self.db.jarvis.autopurge.delete_one({"_id": autopurge["_id"]})
|
|
await ctx.send(f"Autopurge removed from {channel.mention}.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="update",
|
|
description="Update autopurge on a channel",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to update",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="delay",
|
|
description="New time to save",
|
|
option_type=4,
|
|
required=True,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_update(
|
|
self, ctx: SlashContext, channel: TextChannel, delay: int
|
|
):
|
|
autopurge = self.db.jarvis.autopurge.find_one(
|
|
{"guild": ctx.guild.id, "channel": channel.id}
|
|
)
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", hidden=True)
|
|
return
|
|
self.db.jarvis.autopurge.update_one(
|
|
{"_id": autopurge["_id"]}, {"$set": {"delay": delay}}
|
|
)
|
|
await ctx.send(
|
|
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
|
|
)
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(AdminCog(bot))
|