jarvis-bot/jarvis/cogs/admin.py

1316 lines
42 KiB
Python

import re
from datetime import datetime, timedelta
from typing import Union
import pymongo
from ButtonPaginator import Paginator
from discord import Member, Role, TextChannel, User, VoiceChannel
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find, get
from discord_slash import SlashContext, cog_ext
from discord_slash.model import ButtonStyle
from discord_slash.utils.manage_commands import create_choice, create_option
import jarvis
from jarvis.db import DBManager
from jarvis.db.types import (
Autopurge,
Ban,
Kick,
Lock,
MongoSort,
Mute,
Purge,
Setting,
Unban,
Warning,
)
from jarvis.utils import build_embed
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class AdminCog(commands.Cog):
"""
Guild admin functions
Used to manage guilds
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
config = jarvis.config.get_config()
self.db = DBManager(config.mongo).mongo.jarvis
self.cache = {}
self._expire_interaction.start()
def check_cache(self, ctx: SlashContext, **kwargs):
if not kwargs:
kwargs = {}
return find(
lambda x: x["command"] == ctx.subcommand_name
and x["user"] == ctx.author.id
and x["guild"] == ctx.guild.id
and all(x[k] == v for k, v in kwargs.items()),
self.cache.values(),
)
@cog_ext.cog_slash(
name="ban",
description="Ban a user",
options=[
create_option(
name="user",
description="User to ban",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Ban reason",
required=True,
option_type=3,
),
create_option(
name="type",
description="Ban type",
option_type=3,
required=False,
choices=[
create_choice(value="perm", name="Permanent"),
create_choice(value="temp", name="Temporary"),
create_choice(value="soft", name="Soft"),
],
),
create_option(
name="duration",
description="Ban duration in hours if temporary",
required=False,
option_type=4,
),
],
)
@admin_or_permissions(ban_members=True)
async def _ban(
self,
ctx: SlashContext,
user: User = None,
reason: str = None,
type: str = "perm",
duration: int = 4,
):
if not user or user == ctx.author:
await ctx.send("You cannot ban yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if type == "temp" and duration < 0:
await ctx.send(
"You cannot set a temp ban to < 0 hours.", hidden=True
)
return
elif type == "temp" and duration > 744:
await ctx.send(
"You cannot set a temp ban to > 1 month", hidden=True
)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
mtype = type
if mtype == "perm":
mtype = "perma"
guild_name = ctx.guild.name
user_message = (
f"You have been {mtype}banned from {guild_name}."
+ f" Reason:\n{reason}"
)
if mtype == "temp":
user_message += f"\nDuration: {duration} hours"
fields = [Field(name="Type", value=mtype)]
if mtype == "temp":
fields.append(Field(name="Duration", value=f"{duration} hour(s)"))
user_embed = build_embed(
title="You have been banned",
description=f"Reason: {reason}",
fields=fields,
)
user_embed.set_author(
name=ctx.author.name + "#" + ctx.author.discriminator,
icon_url=ctx.author.avatar_url,
)
user_embed.set_thumbnail(url=ctx.guild.icon_url)
try:
await user.send(embed=user_embed)
except Exception:
send_failed = True
try:
await ctx.guild.ban(user, reason=reason)
except Exception as e:
await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True)
return
send_failed = False
if mtype == "soft":
await ctx.guild.unban(user, reason="Ban was softban")
fields.append(Field(name="DM Sent?", value=str(not send_failed)))
admin_embed = build_embed(
title="User Banned",
description=f"Reason: {reason}",
fields=fields,
)
admin_embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
admin_embed.set_thumbnail(url=user.avatar_url)
admin_embed.set_footer(
text=f"{user.name}#{user.discriminator} | {user.id}"
)
await ctx.send(embed=admin_embed)
if type != "temp":
duration = None
active = True
if type == "soft":
active = False
_ = Ban(
user=user.id,
username=user.name,
discrim=user.discriminator,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
type=type,
duration=duration,
active=active,
).insert()
async def discord_apply_unban(
self, ctx: SlashContext, user: User, reason: str
):
await ctx.guild.unban(user, reason=reason)
_ = Unban(
user=user.id,
username=user.name,
discrim=user.discriminator,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
).insert()
embed = build_embed(
title="User Unbanned",
description=f"<@{user.id}> was unbanned",
fields=[Field(name="Reason", value=reason)],
)
embed.set_author(
name=user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="unban",
description="Unban a user",
options=[
create_option(
name="user",
description="User to unban",
option_type=3,
required=True,
),
create_option(
name="reason",
description="Unban reason",
required=True,
option_type=3,
),
],
)
@admin_or_permissions(ban_members=True)
async def _unban(
self,
ctx: SlashContext,
user: str,
reason: str,
):
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
orig_user = user
discrim = None
discord_ban_info = None
database_ban_info = None
bans = await ctx.guild.bans()
# Try to get ban information out of Discord
if re.match("^[0-9]{1,}$", user): # User ID
user = int(user)
discord_ban_info = find(lambda x: x.user.id == user, bans)
else: # User name
if re.match("#[0-9]{4}$", user): # User name has discrim
user, discrim = user.split("#")
if discrim:
discord_ban_info = find(
lambda x: x.user.name == user
and x.user.discriminator == discrim,
bans,
)
else:
results = [
x for x in filter(lambda x: x.user.name == user, bans)
]
if results:
if len(results) > 1:
active_bans = []
for ban in bans:
active_bans.append(
"{0} ({1}): {2}".format(
ban.user.name, ban.user.id, ban.reason
)
)
message = (
"More than one result. "
+ "Please use one of the following IDs:\n```"
+ "\n".join(active_bans)
+ "\n```"
)
await ctx.send(message)
return
else:
discord_ban_info = results[0]
# If we don't have the ban information in Discord,
# try to find the relevant information in the database.
# We take advantage of the previous checks to save CPU cycles
if not discord_ban_info:
if isinstance(user, int):
database_ban_info = Ban.get(
guild=ctx.guild.id, user=user, active=True
)
else:
search = {
"guild": ctx.guild.id,
"username": user,
"active": True,
}
if discrim:
search["discrim"] = discrim
database_ban_info = Ban.get(**search)
if not discord_ban_info and not database_ban_info:
await ctx.send(f"Unable to find user {orig_user}", hidden=True)
elif discord_ban_info:
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
else:
discord_ban_info = find(
lambda x: x.user.id == database_ban_info["id"], bans
)
if discord_ban_info:
await self.discord_apply_unban(
ctx, discord_ban_info.user, reason
)
else:
database_ban_info.active = False
database_ban_info.update()
_ = Unban(
user=database_ban_info.user,
username=database_ban_info.username,
discrim=database_ban_info.discrim,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
).insert()
await ctx.send(
"Unable to find user in Discord, "
+ "but removed entry from database."
)
@cog_ext.cog_subcommand(
base="bans",
name="list",
description="List bans",
options=[
create_option(
name="type",
description="Ban type",
option_type=4,
required=False,
choices=[
create_choice(value=0, name="All"),
create_choice(value=1, name="Permanent"),
create_choice(value=2, name="Temporary"),
create_choice(value=3, name="Soft"),
],
),
create_option(
name="active",
description="Active bans",
option_type=4,
required=False,
choices=[
create_choice(value=1, name="Yes"),
create_choice(value=0, name="No"),
],
),
],
)
@admin_or_permissions(ban_members=True)
async def _bans_list(
self, ctx: SlashContext, type: int = 0, active: int = 1
):
active = bool(active)
exists = self.check_cache(ctx, type=type, active=active)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
types = [0, "perm", "temp", "soft"]
search = {"guild": ctx.guild.id}
if active:
search["active"] = True
if type > 0:
search["type"] = types[type]
bans = Ban.get_many(**search)
bans.sort(key=lambda x: x.created_at, reverse=True)
db_bans = []
fields = []
for ban in bans:
if not ban.username:
user = await self.bot.fetch_user(ban.user)
ban.username = user.name if user else "[deleted user]"
fields.append(
Field(
name=f"Username: {ban.username}#{ban.discrim}",
value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n"
+ f"User ID: {ban.user}\n"
+ f"Reason: {ban.reason}\n"
+ f"Type: {ban.type}\n\u200b",
inline=False,
)
)
db_bans.append(ban.user)
if type == 0:
bans = await ctx.guild.bans()
for ban in bans:
if ban.user.id not in db_bans:
fields.append(
Field(
name=f"Username: {ban.user.name}#"
+ f"{ban.user.discriminator}",
value="Date: [unknown]\n"
+ f"User ID: {ban.user.id}\n"
+ f"Reason: {ban.reason}\n"
+ "Type: manual\n\u200b",
inline=False,
)
)
pages = []
title = "Active " if active else "Inactive "
if type > 0:
title += types[type]
if type == 1:
title += "a"
title += "bans"
if len(fields) == 0:
embed = build_embed(
title=title,
description=f"No {'in' if not active else ''}active bans",
fields=[],
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
else:
for i in range(0, len(bans), 5):
embed = build_embed(
title=title, description="", fields=fields[i : i + 5]
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"guild": ctx.guild.id,
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"type": type,
"active": active,
"paginator": paginator,
}
await paginator.start()
@cog_ext.cog_slash(
name="kick",
description="Kick a user",
options=[
create_option(
name="user",
description="User to kick",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Kick reason",
required=False,
option_type=3,
),
],
)
@admin_or_permissions(kick_members=True)
async def _kick(self, ctx: SlashContext, user: User, reason=None):
if not user or user == ctx.author:
await ctx.send("You cannot kick yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
guild_name = ctx.guild.name
embed = build_embed(
title=f"You have been kicked from {guild_name}",
description=f"Reason: {reason}",
fields=[],
)
embed.set_author(
name=ctx.author.name + "#" + ctx.author.discriminator,
icon_url=ctx.author.avatar_url,
)
embed.set_thumbnail(ctx.guild.icon_url)
send_failed = False
try:
await user.send(embed=embed)
except Exception:
send_failed = True
await ctx.guild.kick(user, reason=reason)
fields = [Field(name="DM Sent?", value=str(not send_failed))]
embed = build_embed(
title="User Kicked",
description=f"Reason: {reason}",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
_ = Kick(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
).insert()
@cog_ext.cog_slash(
name="purge",
description="Purge messages from channel",
options=[
create_option(
name="amount",
description="Amount of messages to purge",
required=False,
option_type=4,
)
],
)
@admin_or_permissions(manage_messages=True)
async def _purge(self, ctx: SlashContext, amount: int = 10):
if amount < 1:
await ctx.send("Amount must be >= 1", hidden=True)
return
await ctx.defer()
channel = ctx.channel
messages = []
async for message in channel.history(limit=amount + 1):
messages.append(message)
await channel.delete_messages(messages)
_ = Purge(
channel=ctx.channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
count=amount,
).insert()
@cog_ext.cog_slash(
name="mute",
description="Mute a user",
options=[
create_option(
name="user",
description="User to mute",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for mute",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Mute duration",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(mute_members=True)
async def _mute(
self, ctx: SlashContext, user: Member, reason: str, duration: int = 30
):
if user == ctx.author:
await ctx.send("You cannot mute yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
if not mute_setting:
await ctx.send(
"Please configure a mute role "
+ "with /settings mute <role> first",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting.value)
if role in user.roles:
await ctx.send("User already muted", hidden=True)
return
await user.add_roles(role, reason=reason)
if duration < 0 or duration > 300:
duration = -1
_ = Mute(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
duration=duration,
active=True if duration >= 0 else False,
).insert()
embed = build_embed(
title="User Muted",
description=f"{user.mention} has been muted",
fields=[Field(name="Reason", value=reason)],
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="unmute",
description="Unmute a user",
options=[
create_option(
name="user",
description="User to unmute",
option_type=6,
required=True,
)
],
)
@admin_or_permissions(mute_members=True)
async def _unmute(self, ctx: SlashContext, user: Member):
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
if not mute_setting:
await ctx.send(
"Please configure a mute role with "
+ "/settings mute <role> first.",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting.value)
if role in user.roles:
await user.remove_roles(role, reason="Unmute")
else:
await ctx.send("User is not muted.", hidden=True)
return
mutes = Mute.get_many(guild=ctx.guild.id, user=user.id)
for mute in mutes:
mute.active = False
mute.update()
embed = build_embed(
title="User Unmwaruted",
description=f"{user.mention} has been unmuted",
fields=[],
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
async def _lock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
reason: str,
allow_send=False,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = allow_send
elif isinstance(channel, VoiceChannel):
overrides.speak = allow_send
await channel.set_permissions(role, overwrite=overrides, reason=reason)
async def _unlock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = None
elif isinstance(channel, VoiceChannel):
overrides.speak = None
await channel.set_permissions(role, overwrite=overrides)
@cog_ext.cog_slash(
name="lock",
description="Locks a channel",
options=[
create_option(
name="reason",
description="Lock Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lock duration in minutes (default 10)",
option_type=4,
required=False,
),
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lock(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
channel: Union[TextChannel, VoiceChannel] = None,
):
await ctx.defer(hidden=True)
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 300:
await ctx.send("Duration must be < 5 hours", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not channel:
channel = ctx.channel
for role in ctx.guild.roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
_ = Lock(
channel=channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
duration=duration,
).insert()
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
@cog_ext.cog_slash(
name="unlock",
description="Unlocks a channel",
options=[
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _unlock(
self,
ctx: SlashContext,
channel: Union[TextChannel, VoiceChannel] = None,
):
if not channel:
channel = ctx.channel
lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True)
if not lock:
await ctx.send(f"{channel.mention} not locked.", hidden=True)
return
for role in ctx.guild.roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
lock.active = False
lock.update()
await ctx.send(f"{channel.mention} unlocked")
@cog_ext.cog_subcommand(
base="lockdown",
name="start",
description="Locks a server",
options=[
create_option(
name="reason",
description="Lockdown Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lockdown duration in minutes (default 10)",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lockdown_start(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
):
await ctx.defer(hidden=True)
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 300:
await ctx.send("Duration must be < 5 hours", hidden=True)
return
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
for channel in channels:
for role in roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
updates.append(
pymongo.InsertOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"duration": duration,
"active": True,
"created_at": datetime.utcnow(),
}
)
)
if updates:
self.db.locks.bulk_write(updates)
await ctx.send(f"Server locked for {duration} minute(s)")
@cog_ext.cog_subcommand(
base="lockdown",
name="end",
description="Unlocks a server",
)
@commands.has_permissions(administrator=True)
async def _lockdown_end(
self,
ctx: SlashContext,
):
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
locks = Lock.get_many(guild=ctx.guild.id, active=True)
if not locks:
await ctx.send("No lockdown detected.", hidden=True)
return
await ctx.defer()
for channel in channels:
for role in roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
updates.append(
pymongo.UpdateOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
},
{"$set": {"active": False}},
)
)
if updates:
self.db.locks.bulk_write(updates)
await ctx.send("Server unlocked")
@cog_ext.cog_slash(
name="warn",
description="Warn a user",
options=[
create_option(
name="user",
description="User to warn",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for warning",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Duration of warning in hours, default 24",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _warn(
self, ctx: SlashContext, user: User, reason: str, duration: int = 24
):
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 120:
await ctx.send("Duration must be < 5 days", hidden=True)
return
await ctx.defer()
_ = Warning(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
duration=duration,
active=True,
).insert()
fields = [Field("Reason", reason, False)]
embed = build_embed(
title="Warning",
description=f"{user.mention} has been warned",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="warnings",
description="Get count of user warnings",
options=[
create_option(
name="user",
description="User to view",
option_type=6,
required=True,
),
create_option(
name="active",
description="View only active",
option_type=4,
required=False,
choices=[
create_choice(name="Yes", value=1),
create_choice(name="No", value=0),
],
),
],
)
@commands.has_permissions(administrator=True)
async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1):
active = bool(active)
exists = self.check_cache(ctx, user_id=user.id, active=active)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
warnings = Warning.get_many(
user=user.id,
guild=ctx.guild.id,
sort=MongoSort(direction="desc", key="created_at"),
)
active_warns = list(filter(lambda x: x.active, warnings))
pages = []
if active:
if len(active_warns) == 0:
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | 0 currently active",
fields=[],
)
embed.set_author(name=user.name, icon_url=user.avatar_url)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
else:
fields = []
for warn in active_warns:
admin = ctx.guild.get(warn.admin)
admin_name = "||`[redacted]`||"
if admin:
admin_name = f"{admin.name}#{admin.discriminator}"
fields.append(
Field(
name=warn.created_at.strftime(
"%Y-%m-%d %H:%M:%S UTC"
),
value=f"{warn.reason}\n"
+ f"Admin: {admin_name}\n"
+ "\u200b",
inline=False,
)
)
for i in range(0, len(fields), 5):
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | "
+ f"{len(active_warns)} currently active",
fields=fields[i : i + 5],
)
embed.set_author(
name=user.name + "#" + user.discriminator,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=ctx.guild.icon_url)
embed.set_footer(
text=f"{user.name}#{user.discriminator} | {user.id}"
)
pages.append(embed)
else:
fields = []
for warn in warnings:
title = "[A] " if warn.active else "[I] "
title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(
Field(
name=title,
value=warn.reason + "\n\u200b",
inline=False,
)
)
for i in range(0, len(fields), 5):
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | "
+ f"{len(active_warns)} currently active",
fields=fields[i : i + 5],
)
embed.set_author(
name=user.name + "#" + user.discriminator,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"guild": ctx.guild.id,
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"user_id": user.id,
"active": active,
"paginator": paginator,
}
await paginator.start()
@cog_ext.cog_subcommand(
base="roleping",
name="block",
description="Add a role to the roleping blocklist",
options=[
create_option(
name="role",
description="Role to add to blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_block(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
roles = Setting(guild=ctx.guild.id, setting="roleping", value=[])
if role.id in roles.value:
await ctx.send(
f"Role `{role.name}` already in blocklist.", hidden=True
)
return
roles.value.append(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` added to blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="allow",
description="Remove a role from the roleping blocklist",
options=[
create_option(
name="role",
description="Role to remove from blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_allow(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
if role.id not in roles.value:
await ctx.send(
f"Role `{role.name}` not in blocklist.", hidden=True
)
return
roles.value.remove(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` removed blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="list",
description="List all blocklisted roles",
)
async def _roleping_list(self, ctx: SlashContext):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
message = "Blocklisted Roles:\n```\n"
if not roles.value:
await ctx.send("No roles blocklisted.", hidden=True)
return
for role in roles.value:
role = ctx.guild.get_role(role)
if not role:
continue
message += role.name + "\n"
message += "```"
await ctx.send(message)
@cog_ext.cog_subcommand(
base="autopurge",
name="add",
description="Automatically purge messages after x seconds",
options=[
create_option(
name="channel",
description="Channel to autopurge",
option_type=7,
required=True,
),
create_option(
name="delay",
description="Seconds to keep message before purge, default 30",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_add(
self, ctx: SlashContext, channel: TextChannel, delay: int = 30
):
if not isinstance(channel, TextChannel):
await ctx.send("Channel must be a TextChannel", hidden=True)
return
if delay <= 0:
await ctx.send("Delay must be > 0", hidden=True)
return
elif delay > 300:
await ctx.send("Delay must be < 5 minutes", hidden=True)
return
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if autopurge:
await ctx.send("Autopurge already exists.", hidden=True)
return
autopurge = Autopurge(
guild=ctx.guild.id,
channel=channel.id,
admin=ctx.author.id,
delay=delay,
)
autopurge.insert()
await ctx.send(
f"Autopurge set up on {channel.mention}, "
+ f"delay is {delay} seconds"
)
@cog_ext.cog_subcommand(
base="autopurge",
name="remove",
description="Remove an autopurge",
options=[
create_option(
name="channel",
description="Channel to remove from autopurge",
option_type=7,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel):
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
autopurge.delete()
await ctx.send(f"Autopurge removed from {channel.mention}.")
@cog_ext.cog_subcommand(
base="autopurge",
name="update",
description="Update autopurge on a channel",
options=[
create_option(
name="channel",
description="Channel to update",
option_type=7,
required=True,
),
create_option(
name="delay",
description="New time to save",
option_type=4,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_update(
self, ctx: SlashContext, channel: TextChannel, delay: int
):
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
autopurge.delay = delay
autopurge.update()
await ctx.send(
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
)
@loop(minutes=1)
async def _expire_interaction(self):
keys = list(self.cache.keys())
for key in keys:
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
minutes=1
):
del self.cache[key]
def setup(bot):
bot.add_cog(AdminCog(bot))