1316 lines
42 KiB
Python
1316 lines
42 KiB
Python
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Union
|
|
|
|
import pymongo
|
|
from ButtonPaginator import Paginator
|
|
from discord import Member, Role, TextChannel, User, VoiceChannel
|
|
from discord.ext import commands
|
|
from discord.ext.tasks import loop
|
|
from discord.utils import find, get
|
|
from discord_slash import SlashContext, cog_ext
|
|
from discord_slash.model import ButtonStyle
|
|
from discord_slash.utils.manage_commands import create_choice, create_option
|
|
|
|
import jarvis
|
|
from jarvis.db import DBManager
|
|
from jarvis.db.types import (
|
|
Autopurge,
|
|
Ban,
|
|
Kick,
|
|
Lock,
|
|
MongoSort,
|
|
Mute,
|
|
Purge,
|
|
Setting,
|
|
Unban,
|
|
Warning,
|
|
)
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.field import Field
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class AdminCog(commands.Cog):
|
|
"""
|
|
Guild admin functions
|
|
|
|
Used to manage guilds
|
|
"""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
config = jarvis.config.get_config()
|
|
self.db = DBManager(config.mongo).mongo.jarvis
|
|
self.cache = {}
|
|
self._expire_interaction.start()
|
|
|
|
def check_cache(self, ctx: SlashContext, **kwargs):
|
|
if not kwargs:
|
|
kwargs = {}
|
|
return find(
|
|
lambda x: x["command"] == ctx.subcommand_name
|
|
and x["user"] == ctx.author.id
|
|
and x["guild"] == ctx.guild.id
|
|
and all(x[k] == v for k, v in kwargs.items()),
|
|
self.cache.values(),
|
|
)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="ban",
|
|
description="Ban a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to ban",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Ban reason",
|
|
required=True,
|
|
option_type=3,
|
|
),
|
|
create_option(
|
|
name="type",
|
|
description="Ban type",
|
|
option_type=3,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value="perm", name="Permanent"),
|
|
create_choice(value="temp", name="Temporary"),
|
|
create_choice(value="soft", name="Soft"),
|
|
],
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Ban duration in hours if temporary",
|
|
required=False,
|
|
option_type=4,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _ban(
|
|
self,
|
|
ctx: SlashContext,
|
|
user: User = None,
|
|
reason: str = None,
|
|
type: str = "perm",
|
|
duration: int = 4,
|
|
):
|
|
if not user or user == ctx.author:
|
|
await ctx.send("You cannot ban yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if type == "temp" and duration < 0:
|
|
await ctx.send(
|
|
"You cannot set a temp ban to < 0 hours.", hidden=True
|
|
)
|
|
return
|
|
elif type == "temp" and duration > 744:
|
|
await ctx.send(
|
|
"You cannot set a temp ban to > 1 month", hidden=True
|
|
)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not reason:
|
|
reason = (
|
|
"Mr. Stark is displeased with your presence. Please leave."
|
|
)
|
|
|
|
mtype = type
|
|
if mtype == "perm":
|
|
mtype = "perma"
|
|
|
|
guild_name = ctx.guild.name
|
|
user_message = (
|
|
f"You have been {mtype}banned from {guild_name}."
|
|
+ f" Reason:\n{reason}"
|
|
)
|
|
if mtype == "temp":
|
|
user_message += f"\nDuration: {duration} hours"
|
|
|
|
fields = [Field(name="Type", value=mtype)]
|
|
|
|
if mtype == "temp":
|
|
fields.append(Field(name="Duration", value=f"{duration} hour(s)"))
|
|
|
|
user_embed = build_embed(
|
|
title="You have been banned",
|
|
description=f"Reason: {reason}",
|
|
fields=fields,
|
|
)
|
|
|
|
user_embed.set_author(
|
|
name=ctx.author.name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
user_embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
|
|
try:
|
|
await user.send(embed=user_embed)
|
|
except Exception:
|
|
send_failed = True
|
|
try:
|
|
await ctx.guild.ban(user, reason=reason)
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True)
|
|
return
|
|
send_failed = False
|
|
if mtype == "soft":
|
|
await ctx.guild.unban(user, reason="Ban was softban")
|
|
|
|
fields.append(Field(name="DM Sent?", value=str(not send_failed)))
|
|
|
|
admin_embed = build_embed(
|
|
title="User Banned",
|
|
description=f"Reason: {reason}",
|
|
fields=fields,
|
|
)
|
|
|
|
admin_embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
admin_embed.set_thumbnail(url=user.avatar_url)
|
|
admin_embed.set_footer(
|
|
text=f"{user.name}#{user.discriminator} | {user.id}"
|
|
)
|
|
|
|
await ctx.send(embed=admin_embed)
|
|
if type != "temp":
|
|
duration = None
|
|
active = True
|
|
if type == "soft":
|
|
active = False
|
|
|
|
_ = Ban(
|
|
user=user.id,
|
|
username=user.name,
|
|
discrim=user.discriminator,
|
|
reason=reason,
|
|
admin=ctx.author.id,
|
|
guild=ctx.guild.id,
|
|
type=type,
|
|
duration=duration,
|
|
active=active,
|
|
).insert()
|
|
|
|
async def discord_apply_unban(
|
|
self, ctx: SlashContext, user: User, reason: str
|
|
):
|
|
await ctx.guild.unban(user, reason=reason)
|
|
_ = Unban(
|
|
user=user.id,
|
|
username=user.name,
|
|
discrim=user.discriminator,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
reason=reason,
|
|
).insert()
|
|
|
|
embed = build_embed(
|
|
title="User Unbanned",
|
|
description=f"<@{user.id}> was unbanned",
|
|
fields=[Field(name="Reason", value=reason)],
|
|
)
|
|
embed.set_author(
|
|
name=user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=user.avatar_url)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unban",
|
|
description="Unban a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to unban",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Unban reason",
|
|
required=True,
|
|
option_type=3,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _unban(
|
|
self,
|
|
ctx: SlashContext,
|
|
user: str,
|
|
reason: str,
|
|
):
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
|
|
orig_user = user
|
|
discrim = None
|
|
discord_ban_info = None
|
|
database_ban_info = None
|
|
|
|
bans = await ctx.guild.bans()
|
|
|
|
# Try to get ban information out of Discord
|
|
if re.match("^[0-9]{1,}$", user): # User ID
|
|
user = int(user)
|
|
discord_ban_info = find(lambda x: x.user.id == user, bans)
|
|
else: # User name
|
|
if re.match("#[0-9]{4}$", user): # User name has discrim
|
|
user, discrim = user.split("#")
|
|
if discrim:
|
|
discord_ban_info = find(
|
|
lambda x: x.user.name == user
|
|
and x.user.discriminator == discrim,
|
|
bans,
|
|
)
|
|
else:
|
|
results = [
|
|
x for x in filter(lambda x: x.user.name == user, bans)
|
|
]
|
|
if results:
|
|
if len(results) > 1:
|
|
active_bans = []
|
|
for ban in bans:
|
|
active_bans.append(
|
|
"{0} ({1}): {2}".format(
|
|
ban.user.name, ban.user.id, ban.reason
|
|
)
|
|
)
|
|
message = (
|
|
"More than one result. "
|
|
+ "Please use one of the following IDs:\n```"
|
|
+ "\n".join(active_bans)
|
|
+ "\n```"
|
|
)
|
|
await ctx.send(message)
|
|
return
|
|
else:
|
|
discord_ban_info = results[0]
|
|
|
|
# If we don't have the ban information in Discord,
|
|
# try to find the relevant information in the database.
|
|
# We take advantage of the previous checks to save CPU cycles
|
|
if not discord_ban_info:
|
|
if isinstance(user, int):
|
|
database_ban_info = Ban.get(
|
|
guild=ctx.guild.id, user=user, active=True
|
|
)
|
|
else:
|
|
search = {
|
|
"guild": ctx.guild.id,
|
|
"username": user,
|
|
"active": True,
|
|
}
|
|
if discrim:
|
|
search["discrim"] = discrim
|
|
database_ban_info = Ban.get(**search)
|
|
|
|
if not discord_ban_info and not database_ban_info:
|
|
await ctx.send(f"Unable to find user {orig_user}", hidden=True)
|
|
|
|
elif discord_ban_info:
|
|
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
|
|
else:
|
|
discord_ban_info = find(
|
|
lambda x: x.user.id == database_ban_info["id"], bans
|
|
)
|
|
if discord_ban_info:
|
|
await self.discord_apply_unban(
|
|
ctx, discord_ban_info.user, reason
|
|
)
|
|
else:
|
|
database_ban_info.active = False
|
|
database_ban_info.update()
|
|
_ = Unban(
|
|
user=database_ban_info.user,
|
|
username=database_ban_info.username,
|
|
discrim=database_ban_info.discrim,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
reason=reason,
|
|
).insert()
|
|
await ctx.send(
|
|
"Unable to find user in Discord, "
|
|
+ "but removed entry from database."
|
|
)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="bans",
|
|
name="list",
|
|
description="List bans",
|
|
options=[
|
|
create_option(
|
|
name="type",
|
|
description="Ban type",
|
|
option_type=4,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value=0, name="All"),
|
|
create_choice(value=1, name="Permanent"),
|
|
create_choice(value=2, name="Temporary"),
|
|
create_choice(value=3, name="Soft"),
|
|
],
|
|
),
|
|
create_option(
|
|
name="active",
|
|
description="Active bans",
|
|
option_type=4,
|
|
required=False,
|
|
choices=[
|
|
create_choice(value=1, name="Yes"),
|
|
create_choice(value=0, name="No"),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(ban_members=True)
|
|
async def _bans_list(
|
|
self, ctx: SlashContext, type: int = 0, active: int = 1
|
|
):
|
|
active = bool(active)
|
|
exists = self.check_cache(ctx, type=type, active=active)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
"Please use existing interaction: "
|
|
+ f"{exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
types = [0, "perm", "temp", "soft"]
|
|
search = {"guild": ctx.guild.id}
|
|
if active:
|
|
search["active"] = True
|
|
if type > 0:
|
|
search["type"] = types[type]
|
|
bans = Ban.get_many(**search)
|
|
bans.sort(key=lambda x: x.created_at, reverse=True)
|
|
db_bans = []
|
|
fields = []
|
|
for ban in bans:
|
|
if not ban.username:
|
|
user = await self.bot.fetch_user(ban.user)
|
|
ban.username = user.name if user else "[deleted user]"
|
|
fields.append(
|
|
Field(
|
|
name=f"Username: {ban.username}#{ban.discrim}",
|
|
value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n"
|
|
+ f"User ID: {ban.user}\n"
|
|
+ f"Reason: {ban.reason}\n"
|
|
+ f"Type: {ban.type}\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
db_bans.append(ban.user)
|
|
if type == 0:
|
|
bans = await ctx.guild.bans()
|
|
for ban in bans:
|
|
if ban.user.id not in db_bans:
|
|
fields.append(
|
|
Field(
|
|
name=f"Username: {ban.user.name}#"
|
|
+ f"{ban.user.discriminator}",
|
|
value="Date: [unknown]\n"
|
|
+ f"User ID: {ban.user.id}\n"
|
|
+ f"Reason: {ban.reason}\n"
|
|
+ "Type: manual\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
pages = []
|
|
title = "Active " if active else "Inactive "
|
|
if type > 0:
|
|
title += types[type]
|
|
if type == 1:
|
|
title += "a"
|
|
title += "bans"
|
|
if len(fields) == 0:
|
|
embed = build_embed(
|
|
title=title,
|
|
description=f"No {'in' if not active else ''}active bans",
|
|
fields=[],
|
|
)
|
|
embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
pages.append(embed)
|
|
else:
|
|
for i in range(0, len(bans), 5):
|
|
embed = build_embed(
|
|
title=title, description="", fields=fields[i : i + 5]
|
|
)
|
|
embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
pages.append(embed)
|
|
|
|
paginator = Paginator(
|
|
bot=self.bot,
|
|
ctx=ctx,
|
|
embeds=pages,
|
|
only=ctx.author,
|
|
timeout=60 * 5, # 5 minute timeout
|
|
disable_after_timeout=True,
|
|
use_extend=len(pages) > 2,
|
|
left_button_style=ButtonStyle.grey,
|
|
right_button_style=ButtonStyle.grey,
|
|
basic_buttons=["◀", "▶"],
|
|
)
|
|
|
|
self.cache[hash(paginator)] = {
|
|
"guild": ctx.guild.id,
|
|
"user": ctx.author.id,
|
|
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
|
"command": ctx.subcommand_name,
|
|
"type": type,
|
|
"active": active,
|
|
"paginator": paginator,
|
|
}
|
|
|
|
await paginator.start()
|
|
|
|
@cog_ext.cog_slash(
|
|
name="kick",
|
|
description="Kick a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to kick",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Kick reason",
|
|
required=False,
|
|
option_type=3,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(kick_members=True)
|
|
async def _kick(self, ctx: SlashContext, user: User, reason=None):
|
|
if not user or user == ctx.author:
|
|
await ctx.send("You cannot kick yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not reason:
|
|
reason = (
|
|
"Mr. Stark is displeased with your presence. Please leave."
|
|
)
|
|
guild_name = ctx.guild.name
|
|
embed = build_embed(
|
|
title=f"You have been kicked from {guild_name}",
|
|
description=f"Reason: {reason}",
|
|
fields=[],
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
embed.set_thumbnail(ctx.guild.icon_url)
|
|
|
|
send_failed = False
|
|
try:
|
|
await user.send(embed=embed)
|
|
except Exception:
|
|
send_failed = True
|
|
await ctx.guild.kick(user, reason=reason)
|
|
|
|
fields = [Field(name="DM Sent?", value=str(not send_failed))]
|
|
embed = build_embed(
|
|
title="User Kicked",
|
|
description=f"Reason: {reason}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=user.avatar_url)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
|
|
await ctx.send(embed=embed)
|
|
_ = Kick(
|
|
user=user.id,
|
|
reason=reason,
|
|
admin=ctx.author.id,
|
|
guild=ctx.guild.id,
|
|
).insert()
|
|
|
|
@cog_ext.cog_slash(
|
|
name="purge",
|
|
description="Purge messages from channel",
|
|
options=[
|
|
create_option(
|
|
name="amount",
|
|
description="Amount of messages to purge",
|
|
required=False,
|
|
option_type=4,
|
|
)
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _purge(self, ctx: SlashContext, amount: int = 10):
|
|
if amount < 1:
|
|
await ctx.send("Amount must be >= 1", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
channel = ctx.channel
|
|
messages = []
|
|
async for message in channel.history(limit=amount + 1):
|
|
messages.append(message)
|
|
await channel.delete_messages(messages)
|
|
_ = Purge(
|
|
channel=ctx.channel.id,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
count=amount,
|
|
).insert()
|
|
|
|
@cog_ext.cog_slash(
|
|
name="mute",
|
|
description="Mute a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to mute",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Reason for mute",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Mute duration",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(mute_members=True)
|
|
async def _mute(
|
|
self, ctx: SlashContext, user: Member, reason: str, duration: int = 30
|
|
):
|
|
if user == ctx.author:
|
|
await ctx.send("You cannot mute yourself.", hidden=True)
|
|
return
|
|
if user == self.bot.user:
|
|
await ctx.send("I'm afraid I can't let you do that", hidden=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
|
|
if not mute_setting:
|
|
await ctx.send(
|
|
"Please configure a mute role "
|
|
+ "with /settings mute <role> first",
|
|
hidden=True,
|
|
)
|
|
return
|
|
role = get(ctx.guild.roles, id=mute_setting.value)
|
|
if role in user.roles:
|
|
await ctx.send("User already muted", hidden=True)
|
|
return
|
|
await user.add_roles(role, reason=reason)
|
|
if duration < 0 or duration > 300:
|
|
duration = -1
|
|
_ = Mute(
|
|
user=user.id,
|
|
reason=reason,
|
|
admin=ctx.author.id,
|
|
guild=ctx.guild.id,
|
|
duration=duration,
|
|
active=True if duration >= 0 else False,
|
|
).insert()
|
|
|
|
embed = build_embed(
|
|
title="User Muted",
|
|
description=f"{user.mention} has been muted",
|
|
fields=[Field(name="Reason", value=reason)],
|
|
)
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=user.avatar_url)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unmute",
|
|
description="Unmute a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to unmute",
|
|
option_type=6,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@admin_or_permissions(mute_members=True)
|
|
async def _unmute(self, ctx: SlashContext, user: Member):
|
|
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
|
|
if not mute_setting:
|
|
await ctx.send(
|
|
"Please configure a mute role with "
|
|
+ "/settings mute <role> first.",
|
|
hidden=True,
|
|
)
|
|
return
|
|
|
|
role = get(ctx.guild.roles, id=mute_setting.value)
|
|
if role in user.roles:
|
|
await user.remove_roles(role, reason="Unmute")
|
|
else:
|
|
await ctx.send("User is not muted.", hidden=True)
|
|
return
|
|
|
|
mutes = Mute.get_many(guild=ctx.guild.id, user=user.id)
|
|
for mute in mutes:
|
|
mute.active = False
|
|
mute.update()
|
|
embed = build_embed(
|
|
title="User Unmwaruted",
|
|
description=f"{user.mention} has been unmuted",
|
|
fields=[],
|
|
)
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=user.avatar_url)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
await ctx.send(embed=embed)
|
|
|
|
async def _lock_channel(
|
|
self,
|
|
channel: Union[TextChannel, VoiceChannel],
|
|
role: Role,
|
|
admin: User,
|
|
reason: str,
|
|
allow_send=False,
|
|
):
|
|
overrides = channel.overwrites_for(role)
|
|
if isinstance(channel, TextChannel):
|
|
overrides.send_messages = allow_send
|
|
elif isinstance(channel, VoiceChannel):
|
|
overrides.speak = allow_send
|
|
await channel.set_permissions(role, overwrite=overrides, reason=reason)
|
|
|
|
async def _unlock_channel(
|
|
self,
|
|
channel: Union[TextChannel, VoiceChannel],
|
|
role: Role,
|
|
admin: User,
|
|
):
|
|
overrides = channel.overwrites_for(role)
|
|
if isinstance(channel, TextChannel):
|
|
overrides.send_messages = None
|
|
elif isinstance(channel, VoiceChannel):
|
|
overrides.speak = None
|
|
await channel.set_permissions(role, overwrite=overrides)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="lock",
|
|
description="Locks a channel",
|
|
options=[
|
|
create_option(
|
|
name="reason",
|
|
description="Lock Reason",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Lock duration in minutes (default 10)",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to lock",
|
|
option_type=7,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lock(
|
|
self,
|
|
ctx: SlashContext,
|
|
reason: str,
|
|
duration: int = 10,
|
|
channel: Union[TextChannel, VoiceChannel] = None,
|
|
):
|
|
await ctx.defer(hidden=True)
|
|
if duration <= 0:
|
|
await ctx.send("Duration must be > 0", hidden=True)
|
|
return
|
|
elif duration >= 300:
|
|
await ctx.send("Duration must be < 5 hours", hidden=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if not channel:
|
|
channel = ctx.channel
|
|
for role in ctx.guild.roles:
|
|
try:
|
|
await self._lock_channel(channel, role, ctx.author, reason)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
_ = Lock(
|
|
channel=channel.id,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
reason=reason,
|
|
duration=duration,
|
|
).insert()
|
|
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
|
|
|
|
@cog_ext.cog_slash(
|
|
name="unlock",
|
|
description="Unlocks a channel",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to lock",
|
|
option_type=7,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _unlock(
|
|
self,
|
|
ctx: SlashContext,
|
|
channel: Union[TextChannel, VoiceChannel] = None,
|
|
):
|
|
if not channel:
|
|
channel = ctx.channel
|
|
lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True)
|
|
if not lock:
|
|
await ctx.send(f"{channel.mention} not locked.", hidden=True)
|
|
return
|
|
for role in ctx.guild.roles:
|
|
try:
|
|
await self._unlock_channel(channel, role, ctx.author)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
lock.active = False
|
|
lock.update()
|
|
await ctx.send(f"{channel.mention} unlocked")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="lockdown",
|
|
name="start",
|
|
description="Locks a server",
|
|
options=[
|
|
create_option(
|
|
name="reason",
|
|
description="Lockdown Reason",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Lockdown duration in minutes (default 10)",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lockdown_start(
|
|
self,
|
|
ctx: SlashContext,
|
|
reason: str,
|
|
duration: int = 10,
|
|
):
|
|
await ctx.defer(hidden=True)
|
|
if duration <= 0:
|
|
await ctx.send("Duration must be > 0", hidden=True)
|
|
return
|
|
elif duration >= 300:
|
|
await ctx.send("Duration must be < 5 hours", hidden=True)
|
|
return
|
|
channels = ctx.guild.channels
|
|
roles = ctx.guild.roles
|
|
updates = []
|
|
for channel in channels:
|
|
for role in roles:
|
|
try:
|
|
await self._lock_channel(channel, role, ctx.author, reason)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
updates.append(
|
|
pymongo.InsertOne(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
"reason": reason,
|
|
"duration": duration,
|
|
"active": True,
|
|
"created_at": datetime.utcnow(),
|
|
}
|
|
)
|
|
)
|
|
if updates:
|
|
self.db.locks.bulk_write(updates)
|
|
await ctx.send(f"Server locked for {duration} minute(s)")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="lockdown",
|
|
name="end",
|
|
description="Unlocks a server",
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _lockdown_end(
|
|
self,
|
|
ctx: SlashContext,
|
|
):
|
|
channels = ctx.guild.channels
|
|
roles = ctx.guild.roles
|
|
updates = []
|
|
locks = Lock.get_many(guild=ctx.guild.id, active=True)
|
|
if not locks:
|
|
await ctx.send("No lockdown detected.", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
for channel in channels:
|
|
for role in roles:
|
|
try:
|
|
await self._unlock_channel(channel, role, ctx.author)
|
|
except Exception:
|
|
continue # Just continue on error
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": ctx.guild.id,
|
|
"admin": ctx.author.id,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
)
|
|
if updates:
|
|
self.db.locks.bulk_write(updates)
|
|
await ctx.send("Server unlocked")
|
|
|
|
@cog_ext.cog_slash(
|
|
name="warn",
|
|
description="Warn a user",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to warn",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="reason",
|
|
description="Reason for warning",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="duration",
|
|
description="Duration of warning in hours, default 24",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _warn(
|
|
self, ctx: SlashContext, user: User, reason: str, duration: int = 24
|
|
):
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", hidden=True)
|
|
return
|
|
if duration <= 0:
|
|
await ctx.send("Duration must be > 0", hidden=True)
|
|
return
|
|
elif duration >= 120:
|
|
await ctx.send("Duration must be < 5 days", hidden=True)
|
|
return
|
|
await ctx.defer()
|
|
_ = Warning(
|
|
user=user.id,
|
|
reason=reason,
|
|
admin=ctx.author.id,
|
|
guild=ctx.guild.id,
|
|
duration=duration,
|
|
active=True,
|
|
).insert()
|
|
fields = [Field("Reason", reason, False)]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{user.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=user.nick if user.nick else user.name,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="warnings",
|
|
description="Get count of user warnings",
|
|
options=[
|
|
create_option(
|
|
name="user",
|
|
description="User to view",
|
|
option_type=6,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="active",
|
|
description="View only active",
|
|
option_type=4,
|
|
required=False,
|
|
choices=[
|
|
create_choice(name="Yes", value=1),
|
|
create_choice(name="No", value=0),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1):
|
|
active = bool(active)
|
|
exists = self.check_cache(ctx, user_id=user.id, active=active)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
"Please use existing interaction: "
|
|
+ f"{exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
warnings = Warning.get_many(
|
|
user=user.id,
|
|
guild=ctx.guild.id,
|
|
sort=MongoSort(direction="desc", key="created_at"),
|
|
)
|
|
active_warns = list(filter(lambda x: x.active, warnings))
|
|
|
|
pages = []
|
|
if active:
|
|
if len(active_warns) == 0:
|
|
embed = build_embed(
|
|
title="Warnings",
|
|
description=f"{len(warnings)} total | 0 currently active",
|
|
fields=[],
|
|
)
|
|
embed.set_author(name=user.name, icon_url=user.avatar_url)
|
|
embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
pages.append(embed)
|
|
else:
|
|
fields = []
|
|
for warn in active_warns:
|
|
admin = ctx.guild.get(warn.admin)
|
|
admin_name = "||`[redacted]`||"
|
|
if admin:
|
|
admin_name = f"{admin.name}#{admin.discriminator}"
|
|
fields.append(
|
|
Field(
|
|
name=warn.created_at.strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
),
|
|
value=f"{warn.reason}\n"
|
|
+ f"Admin: {admin_name}\n"
|
|
+ "\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
for i in range(0, len(fields), 5):
|
|
embed = build_embed(
|
|
title="Warnings",
|
|
description=f"{len(warnings)} total | "
|
|
+ f"{len(active_warns)} currently active",
|
|
fields=fields[i : i + 5],
|
|
)
|
|
embed.set_author(
|
|
name=user.name + "#" + user.discriminator,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
embed.set_footer(
|
|
text=f"{user.name}#{user.discriminator} | {user.id}"
|
|
)
|
|
pages.append(embed)
|
|
else:
|
|
fields = []
|
|
for warn in warnings:
|
|
title = "[A] " if warn.active else "[I] "
|
|
title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
fields.append(
|
|
Field(
|
|
name=title,
|
|
value=warn.reason + "\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
for i in range(0, len(fields), 5):
|
|
embed = build_embed(
|
|
title="Warnings",
|
|
description=f"{len(warnings)} total | "
|
|
+ f"{len(active_warns)} currently active",
|
|
fields=fields[i : i + 5],
|
|
)
|
|
embed.set_author(
|
|
name=user.name + "#" + user.discriminator,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.guild.icon_url)
|
|
pages.append(embed)
|
|
|
|
paginator = Paginator(
|
|
bot=self.bot,
|
|
ctx=ctx,
|
|
embeds=pages,
|
|
only=ctx.author,
|
|
timeout=60 * 5, # 5 minute timeout
|
|
disable_after_timeout=True,
|
|
use_extend=len(pages) > 2,
|
|
left_button_style=ButtonStyle.grey,
|
|
right_button_style=ButtonStyle.grey,
|
|
basic_buttons=["◀", "▶"],
|
|
)
|
|
|
|
self.cache[hash(paginator)] = {
|
|
"guild": ctx.guild.id,
|
|
"user": ctx.author.id,
|
|
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
|
"command": ctx.subcommand_name,
|
|
"user_id": user.id,
|
|
"active": active,
|
|
"paginator": paginator,
|
|
}
|
|
|
|
await paginator.start()
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="block",
|
|
description="Add a role to the roleping blocklist",
|
|
options=[
|
|
create_option(
|
|
name="role",
|
|
description="Role to add to blocklist",
|
|
option_type=8,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _roleping_block(self, ctx: SlashContext, role: Role):
|
|
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
|
|
if not roles:
|
|
roles = Setting(guild=ctx.guild.id, setting="roleping", value=[])
|
|
|
|
if role.id in roles.value:
|
|
await ctx.send(
|
|
f"Role `{role.name}` already in blocklist.", hidden=True
|
|
)
|
|
return
|
|
roles.value.append(role.id)
|
|
roles.update()
|
|
await ctx.send(f"Role `{role.name}` added to blocklist.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="allow",
|
|
description="Remove a role from the roleping blocklist",
|
|
options=[
|
|
create_option(
|
|
name="role",
|
|
description="Role to remove from blocklist",
|
|
option_type=8,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def _roleping_allow(self, ctx: SlashContext, role: Role):
|
|
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
|
|
if not roles:
|
|
await ctx.send("No blocklist configured.", hidden=True)
|
|
return
|
|
|
|
if role.id not in roles.value:
|
|
await ctx.send(
|
|
f"Role `{role.name}` not in blocklist.", hidden=True
|
|
)
|
|
return
|
|
roles.value.remove(role.id)
|
|
roles.update()
|
|
await ctx.send(f"Role `{role.name}` removed blocklist.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="roleping",
|
|
name="list",
|
|
description="List all blocklisted roles",
|
|
)
|
|
async def _roleping_list(self, ctx: SlashContext):
|
|
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
|
|
if not roles:
|
|
await ctx.send("No blocklist configured.", hidden=True)
|
|
return
|
|
|
|
message = "Blocklisted Roles:\n```\n"
|
|
if not roles.value:
|
|
await ctx.send("No roles blocklisted.", hidden=True)
|
|
return
|
|
for role in roles.value:
|
|
role = ctx.guild.get_role(role)
|
|
if not role:
|
|
continue
|
|
message += role.name + "\n"
|
|
message += "```"
|
|
await ctx.send(message)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="add",
|
|
description="Automatically purge messages after x seconds",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to autopurge",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="delay",
|
|
description="Seconds to keep message before purge, default 30",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_add(
|
|
self, ctx: SlashContext, channel: TextChannel, delay: int = 30
|
|
):
|
|
if not isinstance(channel, TextChannel):
|
|
await ctx.send("Channel must be a TextChannel", hidden=True)
|
|
return
|
|
if delay <= 0:
|
|
await ctx.send("Delay must be > 0", hidden=True)
|
|
return
|
|
elif delay > 300:
|
|
await ctx.send("Delay must be < 5 minutes", hidden=True)
|
|
return
|
|
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
|
|
if autopurge:
|
|
await ctx.send("Autopurge already exists.", hidden=True)
|
|
return
|
|
autopurge = Autopurge(
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
admin=ctx.author.id,
|
|
delay=delay,
|
|
)
|
|
autopurge.insert()
|
|
await ctx.send(
|
|
f"Autopurge set up on {channel.mention}, "
|
|
+ f"delay is {delay} seconds"
|
|
)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="remove",
|
|
description="Remove an autopurge",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to remove from autopurge",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel):
|
|
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", hidden=True)
|
|
return
|
|
autopurge.delete()
|
|
await ctx.send(f"Autopurge removed from {channel.mention}.")
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="autopurge",
|
|
name="update",
|
|
description="Update autopurge on a channel",
|
|
options=[
|
|
create_option(
|
|
name="channel",
|
|
description="Channel to update",
|
|
option_type=7,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="delay",
|
|
description="New time to save",
|
|
option_type=4,
|
|
required=True,
|
|
),
|
|
],
|
|
)
|
|
@admin_or_permissions(manage_messages=True)
|
|
async def _autopurge_update(
|
|
self, ctx: SlashContext, channel: TextChannel, delay: int
|
|
):
|
|
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", hidden=True)
|
|
return
|
|
autopurge.delay = delay
|
|
autopurge.update()
|
|
await ctx.send(
|
|
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
|
|
)
|
|
|
|
@loop(minutes=1)
|
|
async def _expire_interaction(self):
|
|
keys = list(self.cache.keys())
|
|
for key in keys:
|
|
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
|
|
minutes=1
|
|
):
|
|
del self.cache[key]
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(AdminCog(bot))
|