Split AdminCog into numerous other cogs

This commit is contained in:
Zeva Rose 2021-07-25 16:29:07 -06:00
parent f697d07728
commit 98fa85b787
11 changed files with 1431 additions and 1316 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,21 @@
from jarvis.cogs.admin import (
ban,
kick,
lock,
lockdown,
mute,
purge,
roleping,
warning,
)
def setup(bot):
bot.add_cog(ban.BanCog(bot))
bot.add_cog(kick.KickCog(bot))
bot.add_cog(lock.LockCog(bot))
bot.add_cog(lockdown.LockdownCog(bot))
bot.add_cog(mute.MuteCog(bot))
bot.add_cog(purge.PurgeCog(bot))
bot.add_cog(roleping.RolepingCog(bot))
bot.add_cog(warning.WarningCog(bot))

455
jarvis/cogs/admin/ban.py Normal file
View file

@ -0,0 +1,455 @@
import re
from datetime import datetime, timedelta
from ButtonPaginator import Paginator
from discord import User
from discord.ext import commands
from discord.utils import find
from discord_slash import SlashContext, cog_ext
from discord_slash.model import ButtonStyle
from discord_slash.utils.manage_commands import create_choice, create_option
from jarvis.db.types import Ban, Unban
from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class BanCog(CacheCog):
def __init__(self, bot: commands.Bot):
super().__init__(bot)
async def discord_apply_ban(
self,
ctx: SlashContext,
reason: str,
user: User,
duration: int,
active: bool,
fields: list,
):
await ctx.guild.ban(user, reason=reason)
_ = Ban(
user=user.id,
username=user.name,
discrim=user.discriminator,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
type=type,
duration=duration,
active=active,
).insert()
embed = build_embed(
title="User Banned",
description=f"Reason: {reason}",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
async def discord_apply_unban(
self, ctx: SlashContext, user: User, reason: str
):
await ctx.guild.unban(user, reason=reason)
_ = Unban(
user=user.id,
username=user.name,
discrim=user.discriminator,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
).insert()
embed = build_embed(
title="User Unbanned",
description=f"<@{user.id}> was unbanned",
fields=[Field(name="Reason", value=reason)],
)
embed.set_author(
name=user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="ban",
description="Ban a user",
options=[
create_option(
name="user",
description="User to ban",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Ban reason",
required=True,
option_type=3,
),
create_option(
name="type",
description="Ban type",
option_type=3,
required=False,
choices=[
create_choice(value="perm", name="Permanent"),
create_choice(value="temp", name="Temporary"),
create_choice(value="soft", name="Soft"),
],
),
create_option(
name="duration",
description="Ban duration in hours if temporary",
required=False,
option_type=4,
),
],
)
@admin_or_permissions(ban_members=True)
async def _ban(
self,
ctx: SlashContext,
user: User = None,
reason: str = None,
type: str = "perm",
duration: int = 4,
):
if not user or user == ctx.author:
await ctx.send("You cannot ban yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if type == "temp" and duration < 0:
await ctx.send(
"You cannot set a temp ban to < 0 hours.", hidden=True
)
return
elif type == "temp" and duration > 744:
await ctx.send(
"You cannot set a temp ban to > 1 month", hidden=True
)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not reason:
reason = (
"Mr. Stark is displeased with your presence. Please leave."
)
mtype = type
if mtype == "perm":
mtype = "perma"
guild_name = ctx.guild.name
user_message = (
f"You have been {mtype}banned from {guild_name}."
+ f" Reason:\n{reason}"
)
if mtype == "temp":
user_message += f"\nDuration: {duration} hours"
fields = [Field(name="Type", value=mtype)]
if mtype == "temp":
fields.append(Field(name="Duration", value=f"{duration} hour(s)"))
user_embed = build_embed(
title="You have been banned",
description=f"Reason: {reason}",
fields=fields,
)
user_embed.set_author(
name=ctx.author.name + "#" + ctx.author.discriminator,
icon_url=ctx.author.avatar_url,
)
user_embed.set_thumbnail(url=ctx.guild.icon_url)
try:
await user.send(embed=user_embed)
except Exception:
send_failed = True
try:
await ctx.guild.ban(user, reason=reason)
except Exception as e:
await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True)
return
send_failed = False
if mtype == "soft":
await ctx.guild.unban(user, reason="Ban was softban")
fields.append(Field(name="DM Sent?", value=str(not send_failed)))
if type != "temp":
duration = None
active = True
if type == "soft":
active = False
self.discord_apply_ban(ctx, reason, user, duration, active, fields)
@cog_ext.cog_slash(
name="unban",
description="Unban a user",
options=[
create_option(
name="user",
description="User to unban",
option_type=3,
required=True,
),
create_option(
name="reason",
description="Unban reason",
required=True,
option_type=3,
),
],
)
@admin_or_permissions(ban_members=True)
async def _unban(
self,
ctx: SlashContext,
user: str,
reason: str,
):
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
orig_user = user
discrim = None
discord_ban_info = None
database_ban_info = None
bans = await ctx.guild.bans()
# Try to get ban information out of Discord
if re.match("^[0-9]{1,}$", user): # User ID
user = int(user)
discord_ban_info = find(lambda x: x.user.id == user, bans)
else: # User name
if re.match("#[0-9]{4}$", user): # User name has discrim
user, discrim = user.split("#")
if discrim:
discord_ban_info = find(
lambda x: x.user.name == user
and x.user.discriminator == discrim,
bans,
)
else:
results = [
x for x in filter(lambda x: x.user.name == user, bans)
]
if results:
if len(results) > 1:
active_bans = []
for ban in bans:
active_bans.append(
"{0} ({1}): {2}".format(
ban.user.name, ban.user.id, ban.reason
)
)
message = (
"More than one result. "
+ "Please use one of the following IDs:\n```"
+ "\n".join(active_bans)
+ "\n```"
)
await ctx.send(message)
return
else:
discord_ban_info = results[0]
# If we don't have the ban information in Discord,
# try to find the relevant information in the database.
# We take advantage of the previous checks to save CPU cycles
if not discord_ban_info:
if isinstance(user, int):
database_ban_info = Ban.get(
guild=ctx.guild.id, user=user, active=True
)
else:
search = {
"guild": ctx.guild.id,
"username": user,
"active": True,
}
if discrim:
search["discrim"] = discrim
database_ban_info = Ban.get(**search)
if not discord_ban_info and not database_ban_info:
await ctx.send(f"Unable to find user {orig_user}", hidden=True)
elif discord_ban_info:
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
else:
discord_ban_info = find(
lambda x: x.user.id == database_ban_info["id"], bans
)
if discord_ban_info:
await self.discord_apply_unban(
ctx, discord_ban_info.user, reason
)
else:
database_ban_info.active = False
database_ban_info.update()
_ = Unban(
user=database_ban_info.user,
username=database_ban_info.username,
discrim=database_ban_info.discrim,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
).insert()
await ctx.send(
"Unable to find user in Discord, "
+ "but removed entry from database."
)
@cog_ext.cog_subcommand(
base="bans",
name="list",
description="List bans",
options=[
create_option(
name="type",
description="Ban type",
option_type=4,
required=False,
choices=[
create_choice(value=0, name="All"),
create_choice(value=1, name="Permanent"),
create_choice(value=2, name="Temporary"),
create_choice(value=3, name="Soft"),
],
),
create_option(
name="active",
description="Active bans",
option_type=4,
required=False,
choices=[
create_choice(value=1, name="Yes"),
create_choice(value=0, name="No"),
],
),
],
)
@admin_or_permissions(ban_members=True)
async def _bans_list(
self, ctx: SlashContext, type: int = 0, active: int = 1
):
active = bool(active)
exists = self.check_cache(ctx, type=type, active=active)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
types = [0, "perm", "temp", "soft"]
search = {"guild": ctx.guild.id}
if active:
search["active"] = True
if type > 0:
search["type"] = types[type]
bans = Ban.get_many(**search)
bans.sort(key=lambda x: x.created_at, reverse=True)
db_bans = []
fields = []
for ban in bans:
if not ban.username:
user = await self.bot.fetch_user(ban.user)
ban.username = user.name if user else "[deleted user]"
fields.append(
Field(
name=f"Username: {ban.username}#{ban.discrim}",
value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n"
+ f"User ID: {ban.user}\n"
+ f"Reason: {ban.reason}\n"
+ f"Type: {ban.type}\n\u200b",
inline=False,
)
)
db_bans.append(ban.user)
if type == 0 and active:
bans = await ctx.guild.bans()
for ban in bans:
if ban.user.id not in db_bans:
fields.append(
Field(
name=f"Username: {ban.user.name}#"
+ f"{ban.user.discriminator}",
value="Date: [unknown]\n"
+ f"User ID: {ban.user.id}\n"
+ f"Reason: {ban.reason}\n"
+ "Type: manual\n\u200b",
inline=False,
)
)
pages = []
title = "Active " if active else "Inactive "
if type > 0:
title += types[type]
if type == 1:
title += "a"
title += "bans"
if len(fields) == 0:
embed = build_embed(
title=title,
description=f"No {'in' if not active else ''}active bans",
fields=[],
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
else:
for i in range(0, len(bans), 5):
embed = build_embed(
title=title, description="", fields=fields[i : i + 5]
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"guild": ctx.guild.id,
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"type": type,
"active": active,
"paginator": paginator,
}
await paginator.start()

88
jarvis/cogs/admin/kick.py Normal file
View file

@ -0,0 +1,88 @@
from discord import User
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Kick
from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class KickCog(CacheCog):
def __init__(self, bot):
super().__init__(bot)
@cog_ext.cog_slash(
name="kick",
description="Kick a user",
options=[
create_option(
name="user",
description="User to kick",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Kick reason",
required=False,
option_type=3,
),
],
)
@admin_or_permissions(kick_members=True)
async def _kick(self, ctx: SlashContext, user: User, reason=None):
if not user or user == ctx.author:
await ctx.send("You cannot kick yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not reason:
reason = "Mr. Stark is displeased with your presence. Please leave."
guild_name = ctx.guild.name
embed = build_embed(
title=f"You have been kicked from {guild_name}",
description=f"Reason: {reason}",
fields=[],
)
embed.set_author(
name=ctx.author.name + "#" + ctx.author.discriminator,
icon_url=ctx.author.avatar_url,
)
embed.set_thumbnail(ctx.guild.icon_url)
send_failed = False
try:
await user.send(embed=embed)
except Exception:
send_failed = True
await ctx.guild.kick(user, reason=reason)
fields = [Field(name="DM Sent?", value=str(not send_failed))]
embed = build_embed(
title="User Kicked",
description=f"Reason: {reason}",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
_ = Kick(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
).insert()

133
jarvis/cogs/admin/lock.py Normal file
View file

@ -0,0 +1,133 @@
from typing import Union
from discord import Role, TextChannel, User, VoiceChannel
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Lock
from jarvis.utils.cachecog import CacheCog
class LockCog(CacheCog):
def __init__(self, bot: commands.Bot):
super().__init__(bot)
async def _lock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
reason: str,
allow_send=False,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = allow_send
elif isinstance(channel, VoiceChannel):
overrides.speak = allow_send
await channel.set_permissions(role, overwrite=overrides, reason=reason)
async def _unlock_channel(
self,
channel: Union[TextChannel, VoiceChannel],
role: Role,
admin: User,
):
overrides = channel.overwrites_for(role)
if isinstance(channel, TextChannel):
overrides.send_messages = None
elif isinstance(channel, VoiceChannel):
overrides.speak = None
await channel.set_permissions(role, overwrite=overrides)
@cog_ext.cog_slash(
name="lock",
description="Locks a channel",
options=[
create_option(
name="reason",
description="Lock Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lock duration in minutes (default 10)",
option_type=4,
required=False,
),
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lock(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
channel: Union[TextChannel, VoiceChannel] = None,
):
await ctx.defer(hidden=True)
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 300:
await ctx.send("Duration must be < 5 hours", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if not channel:
channel = ctx.channel
for role in ctx.guild.roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
_ = Lock(
channel=channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
reason=reason,
duration=duration,
).insert()
await ctx.send(f"{channel.mention} locked for {duration} minute(s)")
@cog_ext.cog_slash(
name="unlock",
description="Unlocks a channel",
options=[
create_option(
name="channel",
description="Channel to lock",
option_type=7,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _unlock(
self,
ctx: SlashContext,
channel: Union[TextChannel, VoiceChannel] = None,
):
if not channel:
channel = ctx.channel
lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True)
if not lock:
await ctx.send(f"{channel.mention} not locked.", hidden=True)
return
for role in ctx.guild.roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
lock.active = False
lock.update()
await ctx.send(f"{channel.mention} unlocked")

View file

@ -0,0 +1,114 @@
from datetime import datetime
import pymongo
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.config import get_config
from jarvis.db import DBManager
from jarvis.db.types import Lock
from jarvis.utils.cachecog import CacheCog
class LockdownCog(CacheCog):
def __init__(self, bot: commands.Bot):
super().__init__(bot)
self.db = DBManager(get_config().mongo).mongo.jarvis
@cog_ext.cog_subcommand(
base="lockdown",
name="start",
description="Locks a server",
options=[
create_option(
name="reason",
description="Lockdown Reason",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Lockdown duration in minutes (default 10)",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _lockdown_start(
self,
ctx: SlashContext,
reason: str,
duration: int = 10,
):
await ctx.defer(hidden=True)
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 300:
await ctx.send("Duration must be < 5 hours", hidden=True)
return
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
for channel in channels:
for role in roles:
try:
await self._lock_channel(channel, role, ctx.author, reason)
except Exception:
continue # Just continue on error
updates.append(
pymongo.InsertOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
"reason": reason,
"duration": duration,
"active": True,
"created_at": datetime.utcnow(),
}
)
)
if updates:
self.db.locks.bulk_write(updates)
await ctx.send(f"Server locked for {duration} minute(s)")
@cog_ext.cog_subcommand(
base="lockdown",
name="end",
description="Unlocks a server",
)
@commands.has_permissions(administrator=True)
async def _lockdown_end(
self,
ctx: SlashContext,
):
channels = ctx.guild.channels
roles = ctx.guild.roles
updates = []
locks = Lock.get_many(guild=ctx.guild.id, active=True)
if not locks:
await ctx.send("No lockdown detected.", hidden=True)
return
await ctx.defer()
for channel in channels:
for role in roles:
try:
await self._unlock_channel(channel, role, ctx.author)
except Exception:
continue # Just continue on error
updates.append(
pymongo.UpdateOne(
{
"channel": channel.id,
"guild": ctx.guild.id,
"admin": ctx.author.id,
},
{"$set": {"active": False}},
)
)
if updates:
self.db.locks.bulk_write(updates)
await ctx.send("Server unlocked")

136
jarvis/cogs/admin/mute.py Normal file
View file

@ -0,0 +1,136 @@
from discord import Member
from discord.ext import commands
from discord.utils import get
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Mute, Setting
from jarvis.utils import build_embed
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class MuteCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@cog_ext.cog_slash(
name="mute",
description="Mute a user",
options=[
create_option(
name="user",
description="User to mute",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for mute",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Mute duration",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(mute_members=True)
async def _mute(
self, ctx: SlashContext, user: Member, reason: str, duration: int = 30
):
if user == ctx.author:
await ctx.send("You cannot mute yourself.", hidden=True)
return
if user == self.bot.user:
await ctx.send("I'm afraid I can't let you do that", hidden=True)
return
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
if not mute_setting:
await ctx.send(
"Please configure a mute role "
+ "with /settings mute <role> first",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting.value)
if role in user.roles:
await ctx.send("User already muted", hidden=True)
return
await user.add_roles(role, reason=reason)
if duration < 0 or duration > 300:
duration = -1
_ = Mute(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
duration=duration,
active=True if duration >= 0 else False,
).insert()
embed = build_embed(
title="User Muted",
description=f"{user.mention} has been muted",
fields=[Field(name="Reason", value=reason)],
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="unmute",
description="Unmute a user",
options=[
create_option(
name="user",
description="User to unmute",
option_type=6,
required=True,
)
],
)
@admin_or_permissions(mute_members=True)
async def _unmute(self, ctx: SlashContext, user: Member):
mute_setting = Setting.get(guild=ctx.guild.id, setting="mute")
if not mute_setting:
await ctx.send(
"Please configure a mute role with "
+ "/settings mute <role> first.",
hidden=True,
)
return
role = get(ctx.guild.roles, id=mute_setting.value)
if role in user.roles:
await user.remove_roles(role, reason="Unmute")
else:
await ctx.send("User is not muted.", hidden=True)
return
mutes = Mute.get_many(guild=ctx.guild.id, user=user.id)
for mute in mutes:
mute.active = False
mute.update()
embed = build_embed(
title="User Unmwaruted",
description=f"{user.mention} has been unmuted",
fields=[],
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=user.avatar_url)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)

145
jarvis/cogs/admin/purge.py Normal file
View file

@ -0,0 +1,145 @@
from discord import TextChannel
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Autopurge, Purge
from jarvis.utils.permissions import admin_or_permissions
class PurgeCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@cog_ext.cog_slash(
name="purge",
description="Purge messages from channel",
options=[
create_option(
name="amount",
description="Amount of messages to purge",
required=False,
option_type=4,
)
],
)
@admin_or_permissions(manage_messages=True)
async def _purge(self, ctx: SlashContext, amount: int = 10):
if amount < 1:
await ctx.send("Amount must be >= 1", hidden=True)
return
await ctx.defer()
channel = ctx.channel
messages = []
async for message in channel.history(limit=amount + 1):
messages.append(message)
await channel.delete_messages(messages)
_ = Purge(
channel=ctx.channel.id,
guild=ctx.guild.id,
admin=ctx.author.id,
count=amount,
).insert()
@cog_ext.cog_subcommand(
base="autopurge",
name="add",
description="Automatically purge messages after x seconds",
options=[
create_option(
name="channel",
description="Channel to autopurge",
option_type=7,
required=True,
),
create_option(
name="delay",
description="Seconds to keep message before purge, default 30",
option_type=4,
required=False,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_add(
self, ctx: SlashContext, channel: TextChannel, delay: int = 30
):
if not isinstance(channel, TextChannel):
await ctx.send("Channel must be a TextChannel", hidden=True)
return
if delay <= 0:
await ctx.send("Delay must be > 0", hidden=True)
return
elif delay > 300:
await ctx.send("Delay must be < 5 minutes", hidden=True)
return
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if autopurge:
await ctx.send("Autopurge already exists.", hidden=True)
return
autopurge = Autopurge(
guild=ctx.guild.id,
channel=channel.id,
admin=ctx.author.id,
delay=delay,
)
autopurge.insert()
await ctx.send(
f"Autopurge set up on {channel.mention}, "
+ f"delay is {delay} seconds"
)
@cog_ext.cog_subcommand(
base="autopurge",
name="remove",
description="Remove an autopurge",
options=[
create_option(
name="channel",
description="Channel to remove from autopurge",
option_type=7,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel):
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
autopurge.delete()
await ctx.send(f"Autopurge removed from {channel.mention}.")
@cog_ext.cog_subcommand(
base="autopurge",
name="update",
description="Update autopurge on a channel",
options=[
create_option(
name="channel",
description="Channel to update",
option_type=7,
required=True,
),
create_option(
name="delay",
description="New time to save",
option_type=4,
required=True,
),
],
)
@admin_or_permissions(manage_messages=True)
async def _autopurge_update(
self, ctx: SlashContext, channel: TextChannel, delay: int
):
autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id)
if not autopurge:
await ctx.send("Autopurge does not exist.", hidden=True)
return
autopurge.delay = delay
autopurge.update()
await ctx.send(
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
)

View file

@ -0,0 +1,91 @@
from discord import Role
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Setting
class RolepingCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@cog_ext.cog_subcommand(
base="roleping",
name="block",
description="Add a role to the roleping blocklist",
options=[
create_option(
name="role",
description="Role to add to blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_block(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
roles = Setting(guild=ctx.guild.id, setting="roleping", value=[])
if role.id in roles.value:
await ctx.send(
f"Role `{role.name}` already in blocklist.", hidden=True
)
return
roles.value.append(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` added to blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="allow",
description="Remove a role from the roleping blocklist",
options=[
create_option(
name="role",
description="Role to remove from blocklist",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_allow(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
if role.id not in roles.value:
await ctx.send(
f"Role `{role.name}` not in blocklist.", hidden=True
)
return
roles.value.remove(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` removed blocklist.")
@cog_ext.cog_subcommand(
base="roleping",
name="list",
description="List all blocklisted roles",
)
async def _roleping_list(self, ctx: SlashContext):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
return
message = "Blocklisted Roles:\n```\n"
if not roles.value:
await ctx.send("No roles blocklisted.", hidden=True)
return
for role in roles.value:
role = ctx.guild.get_role(role)
if not role:
continue
message += role.name + "\n"
message += "```"
await ctx.send(message)

View file

@ -0,0 +1,215 @@
from datetime import datetime, timedelta
from ButtonPaginator import Paginator
from discord import User
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.model import ButtonStyle
from discord_slash.utils.manage_commands import create_choice, create_option
from jarvis.db.types import MongoSort, Warning
from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog
from jarvis.utils.field import Field
class WarningCog(CacheCog):
def __init__(self, bot):
super().__init__(bot)
@cog_ext.cog_slash(
name="warn",
description="Warn a user",
options=[
create_option(
name="user",
description="User to warn",
option_type=6,
required=True,
),
create_option(
name="reason",
description="Reason for warning",
option_type=3,
required=True,
),
create_option(
name="duration",
description="Duration of warning in hours, default 24",
option_type=4,
required=False,
),
],
)
@commands.has_permissions(administrator=True)
async def _warn(
self, ctx: SlashContext, user: User, reason: str, duration: int = 24
):
if len(reason) > 100:
await ctx.send("Reason must be < 100 characters", hidden=True)
return
if duration <= 0:
await ctx.send("Duration must be > 0", hidden=True)
return
elif duration >= 120:
await ctx.send("Duration must be < 5 days", hidden=True)
return
await ctx.defer()
_ = Warning(
user=user.id,
reason=reason,
admin=ctx.author.id,
guild=ctx.guild.id,
duration=duration,
active=True,
).insert()
fields = [Field("Reason", reason, False)]
embed = build_embed(
title="Warning",
description=f"{user.mention} has been warned",
fields=fields,
)
embed.set_author(
name=user.nick if user.nick else user.name,
icon_url=user.avatar_url,
)
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
await ctx.send(embed=embed)
@cog_ext.cog_slash(
name="warnings",
description="Get count of user warnings",
options=[
create_option(
name="user",
description="User to view",
option_type=6,
required=True,
),
create_option(
name="active",
description="View only active",
option_type=4,
required=False,
choices=[
create_choice(name="Yes", value=1),
create_choice(name="No", value=0),
],
),
],
)
@commands.has_permissions(administrator=True)
async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1):
active = bool(active)
exists = self.check_cache(ctx, user_id=user.id, active=active)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
warnings = Warning.get_many(
user=user.id,
guild=ctx.guild.id,
sort=MongoSort(direction="desc", key="created_at"),
)
active_warns = list(filter(lambda x: x.active, warnings))
pages = []
if active:
if len(active_warns) == 0:
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | 0 currently active",
fields=[],
)
embed.set_author(name=user.name, icon_url=user.avatar_url)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
else:
fields = []
for warn in active_warns:
admin = ctx.guild.get(warn.admin)
admin_name = "||`[redacted]`||"
if admin:
admin_name = f"{admin.name}#{admin.discriminator}"
fields.append(
Field(
name=warn.created_at.strftime(
"%Y-%m-%d %H:%M:%S UTC"
),
value=f"{warn.reason}\n"
+ f"Admin: {admin_name}\n"
+ "\u200b",
inline=False,
)
)
for i in range(0, len(fields), 5):
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | "
+ f"{len(active_warns)} currently active",
fields=fields[i : i + 5],
)
embed.set_author(
name=user.name + "#" + user.discriminator,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=ctx.guild.icon_url)
embed.set_footer(
text=f"{user.name}#{user.discriminator} | {user.id}"
)
pages.append(embed)
else:
fields = []
for warn in warnings:
title = "[A] " if warn.active else "[I] "
title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(
Field(
name=title,
value=warn.reason + "\n\u200b",
inline=False,
)
)
for i in range(0, len(fields), 5):
embed = build_embed(
title="Warnings",
description=f"{len(warnings)} total | "
+ f"{len(active_warns)} currently active",
fields=fields[i : i + 5],
)
embed.set_author(
name=user.name + "#" + user.discriminator,
icon_url=user.avatar_url,
)
embed.set_thumbnail(url=ctx.guild.icon_url)
pages.append(embed)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"guild": ctx.guild.id,
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"user_id": user.id,
"active": active,
"paginator": paginator,
}
await paginator.start()

33
jarvis/utils/cachecog.py Normal file
View file

@ -0,0 +1,33 @@
from datetime import datetime, timedelta
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find
from discord_slash import SlashContext
class CacheCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.cache = {}
self._expire_interaction.start()
def check_cache(self, ctx: SlashContext, **kwargs):
if not kwargs:
kwargs = {}
return find(
lambda x: x["command"] == ctx.subcommand_name
and x["user"] == ctx.author.id
and x["guild"] == ctx.guild.id
and all(x[k] == v for k, v in kwargs.items()),
self.cache.values(),
)
@loop(minutes=1)
async def _expire_interaction(self):
keys = list(self.cache.keys())
for key in keys:
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
minutes=1
):
del self.cache[key]